首页 > 编程 > PHP > 正文

php如何使用命令行实现异步多进程模式的任务处理(代码)

2020-03-22 18:18:18
字体:
来源:转载
供稿:网友
本篇文章给大家带来的内容是关于php如何使用命令行实现异步多进程模式的任务处理(代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

用PHP来实现异步任务一直是个难题,现有的解决方案中:PHP知名的异步框架有 swoole 和 Workerman,但都是无法在 web 环境中直接使用的,即便强行搭建 web 环境,异步调用也是使用多进程模式实现的。但有时真的不需要用启动服务的方式,让服务端一直等待客户端消息,何况中间还不能改动服务端代码。本文就介绍一下不使用任何框架和第三方库的情况下,在 CLI 环境中如何实现多进程以及在web环境中的异步调用。

在 web 环境的异步调用

常用的方式有两种

1. 使用 socket 连接

这种方式就是典型的C/S架构,需要有服务端支持。

// 1. 创建socket套接字$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);// 2. 进行socket连接socket_connect($socket, 127.0.0.1 , 3939 //socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑// 3. 向服务端发送请求socket_write($socket, $request, strlen($request));// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)$recv = socket_read($socket, 2048);// 5. 关闭socket连接socket_close($socket);

2. 使用 popen 打开进程管道

这种方式是使用操作系统命令,由操作系统直接执行。

本文讨论的异步调用就是使用这种方式。

$sf = /path/to/cli_async_task.php //要执行的脚本文件$op = call //脚本文件接收的参数1$data = base64_encode(serialize([ TestTask , arg1 , arg2 ])); //脚本文件接收的参数2pclose(popen( php $sf --op $op --data $data , r //打开之后接着就关闭进程管道,让该进程以守护模式运行echo PHP_EOL. 异步任务已执行。 .PHP_EOL;

这种方式的优点就是:一步解决,当前进程不需要任何开销。
缺点也很明显:无法跟踪任务脚本的运行状态。
所以重头戏会是在执行任务的脚本文件上,下面就介绍任务处理和多进程的实现方式。

在 CLI 环境的多进程任务处理

注意:多进程模式仅支持Linux,不支持Windows!!

这里会从0开始(未使用任何框架和类库)介绍每一个步骤,最后会附带一份完整的代码。

1. 创建脚本

任何脚本不可忽视的地方就是错误处理。所以写一个任务处理脚本首先就是写错误处理方式。

在PHP中就是调用 set_exception_handler set_error_handler register_shutdown_function 这三个函数,然后写上自定义的处理方法。

接着是定义自动加载函数 spl_autoload_register 免去每使用一个新类都要 require / include 的烦恼。

定义日志操作方法。

定义任务处理方法。

读取来自命令行的参数,开始执行任务。

2. 多进程处理

PHP 创建多进程是使用 pcntl_fork 函数,该函数会 fork 一份当前进程(影分身术),于是就有了两个进程,当前进程是主进程(本体),fork 出的进程是子进程(影分身)。需要注意的是两个进程代码环境是一样的,两个进程都是执行到了 pcntl_fork 函数位置。区别就是 getmypid 获得的进程号不一样,最重要的区分是当调用 pcntl_fork函数时,子进程获得的返回值是 0,而主进程获得的是子进程的进程号 pid。

好了,当我们知道谁是子进程后,就可以让该子进程执行任务了。

那么主进程是如何得知子进程的状态呢?
使用 pcntl_wait。该函数有两个参数 $status 和 $options ,$status 是引用类型,用来存储子进程的状态,$options 有两个可选常量WNOHANG| WUNTRACED,分别表示不等待子进程结束立即返回和等待子进程结束。很明显使用WUNTRACED会阻塞主进程。(也可以使用 pcntl_waitpid 函数获取特定 pid 子进程状态)

在多进程中,主进程要做的就是管理每个子进程的状态,否则子进程很可能无法退出而变成僵尸进程。

关于多进程间的消息通信
这一块需要涉及具体的业务逻辑,所以只能简单的提一下。不考虑使用第三方比如 redis 等服务的情况下,PHP原生可以实现就是管道通信和共享内存等方式。实现起来都比较简单,缺点就是可使用的数据容量有限,只能用简单文本协议交换数据。

如何手动结束所有进程任务

如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk {print $2} |xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。

以下是完整的任务执行脚本代码:

可能无法直接使用,需要修改的地方有:

脚本目录和日志目录常量

自动加载任务类的方法(默认是加载脚本目录中以Task结尾的文件)

其他的如:错误和日志处理方式和文本格式就随意吧...

如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。

多进程的例子:execAsyncTask( multi , [ test = [ a , b , c ], grab = [[ url = http://www.baidu.com , callback = http://localhost ]] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。

 ?phperror_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);@ini_set( display_errors , 0);@ini_set( date.timezone , PRC chdir(__DIR__);/* 任务脚本目录 */defined( TASK_PATH ) or define( TASK_PATH , realpath(__DIR__ . /tasks /* 任务日志目录 */defined( TASK_LOGS_PATH ) or define( TASK_LOGS_PATH , __DIR__ . /tasks/logs if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);set_exception_handler(function($e) { $time = date( H:i:s , time()); $msg = sprintf( . h3 [%s] %s (%s) /h3 . /n . pre %s /pre , $time, $e- getMessage(), $e- getCode(), $e- getTraceAsString() file_put_contents(TASK_LOGS_PATH . /exception- .date( Ymd ). .log , $msg.PHP_EOL, FILE_APPEND|LOCK_EX);set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date( Y-m-d H:i:s , time()); $msg = EOF[{$errno}]时间:{$datetime}信息:{$errmsg}文件:{$filename}行号:{$line}{$backtrace} file_put_contents(TASK_LOGS_PATH . /error- .date( Ymd ). .log , $msg.PHP_EOL, FILE_APPEND|LOCK_EX);register_shutdown_function(function() { $last_error = error_get_last(); if (in_array($last_error[ type ], array(E_ERROR, E_WARNING, E_USER_ERROR))) { debug_log( End. , true);function debug_log($log, $close=false) { html' target='_blank'>static $fp; if (!$fp) { $fp = fopen(TASK_LOGS_PATH . /debug- .date( Ym ). .log , a+  $log = [ . date( Y-m-d H:i:s ) . ] [Task@ . getmypid() . ] . trim($log) . PHP_EOL; if (flock($fp, LOCK_EX)) { fwrite($fp, $log); fflush($fp); flock($fp, LOCK_UN); } else { if ($close) fclose($fp);function call($job) { if (is_callable($job)) { $ret = call_user_func($job); } elseif (is_array($job) and is_callable(@$job[0])) { $ret = call_user_func_array($job[0], array_slice($job, 1)); } else throw new /Exception( 不是可执行的任务!  return $ret;function grab(array $job) { /* 消息数据为json,格式 url : fetch_url , //拉取的链接地址 method : request_method , //请求方法 data : post_data , //POST请求数据 args :[], //请求附加参数 headers|user_agent|proxy|timeout callback : callback_url , //回调地址(统一POST带回应数据) msg_id : message_id //消息ID $url = $job[ url  $headers = @$job[ args ][ headers ] ?: []; $_headers =  if (is_array($headers)) { foreach ($headers as $_k = $header) { if (!is_numeric($_k))  $header = sprintf( %s: %s , $_k, $header); $_headers .= $header . /r/n  $headers = Connection: close/r/n . $_headers; $opts = array( http = array( method = strtoupper(@$job[ method ] ?: get ), content = @$job[ data ] ?: null, header = $headers, user_agent = @$job[ args ][ user_agent ] ?: HTTPGRAB/1.0 (compatible) , proxy = @$job[ args ][ proxy ] ?: null, timeout = intval(@$job[ args ][ timeout ] ?: 120), protocol_version = @$job[ args ][ protocol_version ] ?: 1.1 , max_redirects = 3, ignore_errors = true $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url. -- .strlen($ret)); if ($ret and isset($job[ callback ])) { $postdata = http_build_query(array( msg_id = @$job[ msg_id ] ?: 0, url = @$job[ url ], result = $ret $opts = array( http = array( method = POST , header = Content-type:application/x-www-form-urlencoded . /r/n , content = $postdata, timeout = 30 file_get_contents($job[ callback ], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job[ callback ]. -- .$ret2); return $ret;function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR. * ) as $_file) { if (fileatime($_file) (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; return $ret;function backup($file, $dest) { $zip = new /ZipArchive(); if (!$zip- open($file, /ZipArchive::CREATE)) { return false; _backup_dir($zip, $dest); $zip- close(); return $file;function _backup_dir($zip, $dest, $sub= ) { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip- addFile($dest . $file, $sub . $file); } else { if ($file != . and $file != .. and is_dir($dest . $file)) { //$zip- addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); closedir($dir); return true;
if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; $ret = call($cmd); break; case grab : //抓取网页 if (is_string($data)) $data = [ url = $data]; if (is_array($data)) $ret = grab($data); else throw new /Exception( 无效的命令参数! break; case clean : //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天) if (isset($data[ dirs ])) { $ret = clean($data[ dirs ], @$data[ expires } else { $ret = clean($data); break; case backup : //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹 if (isset($data[ zip ]) and is_dir($data[ dest ])) $ret = backup($data[ zip ], $data[ dest else throw new /Exception( 没有指定需要备份的文件! break; case require : //加载脚本文件 if (is_file($data)) $ret = require($data); else throw new /Exception( 不是可请求的文件! break; case test : sleep(rand(1, 5)); $ret = ucfirst(strval($data)). .PID: . getmypid(); break; case multi : //多进程处理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . pipe. . posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道 throw new Exception( make pipe failed! //$shmid = shmop_open(ftok(__FILE__, h ), c , 0644, 4096); //共享内存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op = $_datas) { $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子进程中执行任务 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, w //写 //stream_set_blocking($pipe, false); $_ret = serialize([ pid = $_pid, op = $_op, args = $_data, result = $_ret]); if (strlen($_ret) 4096) //写入管道的数据最大4K $_ret = serialize([ pid = $_pid, op = $_op, args = $_data, result = [RESPONSE_TOO_LONG] //debug_log( write pipe: .$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子进程 } elseif ($pid 0) { //主进程中记录任务 $childs[] = $pid; $results[$pid] = 0; debug_log( fork by child: .$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception( could not fork at . getmygid()); $pipe = fopen($fifo, r+ //读 stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。 $n = 0; while(count($childs) 0) { foreach($childs as $i = $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res 0) { $_ret = @unserialize(fgets($pipe)); //读取管道数据 $results[$pid] = $_ret; unset($childs[$i]); debug_log( read child: .$pid . - . json_encode($_ret, 64|256)); if ($n 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程 usleep(200000); $n++; debug_log( child process completed. @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new /Exception( 没有可执行的任务! break; $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf( [%s] %s -- (%s) %sms , strtoupper($op), @json_encode($data, 64|256), @strlen($ret) 65?$ret:@strlen($ret), $times); debug_log($log); return $ret;
if (false !== strpos(end($parts), _ )) { array_splice($parts, -1, 1, explode( _ , current($parts))); $filename = implode(DIRECTORY_SEPARATOR, $parts) . .php if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match( /.*Task$/ , $classname)) { //查找以Task结尾的任务脚本类 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . .php } else { return false;}


以上就是php如何使用命令行实现异步多进程模式的任务处理(代码)的详细内容,PHP教程

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表