2020-03-22 19:45:07
文档中说的是 Allows the processing of multiple cURL handles asynchronously. 确实是异步。这里需要理解的是select这个方法,文档中是这么解释的Blocks until there is activity on any of the curl_multi connections.。了解一下常见的异步模型就应该能理解,select, epoll,都很有名
// build the individual requests as above, but do not execute them$ch_1 = curl_init('http://www.phpstudy.net/');$ch_2 = curl_init('http://www.phpstudy.net/');curl_setopt($ch_1, CURLOPT_RETURNTRANSFER, true);curl_setopt($ch_2, CURLOPT_RETURNTRANSFER, true);// build the multi-curl handle, adding both $ch$mh = curl_multi_init();curl_multi_add_handle($mh, $ch_1);curl_multi_add_handle($mh, $ch_2);// execute all queries simultaneously, and continue when all are complete$running = null; curl_multi_exec($mh, $running); $ch = curl_multi_select($mh); if($ch !== 0){ $info = curl_multi_info_read($mh); if($info){ var_dump($info); $response_1 = curl_multi_getcontent($info['handle']); echo "$response_1 /n"; break;} while ($running //close the handlescurl_multi_remove_handle($mh, $ch_1);curl_multi_remove_handle($mh, $ch_2);curl_multi_close($mh);这里我设置的是,select得到结果,就退出循环,并且删除 curl resource, 从而达到取消http请求的目的。2.swoole_client
swoole_client提供了异步模式,我竟然把这个忘了。这里的sleep方法需要swoole版本大于等于1.7.21, 我还没升到这个版本,所以直接exit也可以。
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);//设置事件回调函数$client- on("connect", function($cli) { $req = "GET / HTTP/1.1/r/n Host: www.phpstudy.net/r/n Connection: keep-alive/r/n Cache-Control: no-cache/r/n Pragma: no-cache/r/n/r/n"; for ($i=0; $i $i++) { $cli- send($req);$client- on("receive", function($cli, $data){ echo "Received: ".$data."/n"; exit(0); $cli- sleep(); // swoole = 1.7.21$client- on("error", function($cli){ echo "Connect failed/n";$client- on("close", function($cli){ echo "Connection close/n";//发起网络连接$client- connect('', 80, 1);3.process
哎,竟然差点忘了 swoole_process, 这里就不用 pcntl 模块了。但是写完发现,这其实也不算是中断请求,而是哪个先到读哪个,忽视后面的返回值。
$workers = [];$worker_num = 3;//创建的进程数$finished = false;$lock = new swoole_lock(SWOOLE_MUTEX);for($i=0;$i $worker_num ; $i++){ $process = new swoole_process('process'); //$process- useQueue(); $pid = $process- start(); $workers[$pid] = $process;foreach($workers as $pid = $process){ //子进程也会包含此事件 swoole_event_add($process- pipe, function ($pipe) use($process, $lock, &$finished) { $lock- lock(); if(!$finished){ $finished = true; $data = $process- read(); echo "RECV: " . $data.PHP_EOL; $lock- unlock();function process(swoole_process $process){ $response = 'http response'; $process- write($response); echo $process- pid,"/t",$process- callback .PHP_EOL;for($i = 0; $i $worker_num; $i++) { $ret = swoole_process::wait(); $pid = $ret['pid']; echo "Worker Exit, PID=".$pid.PHP_EOL;4.pthreads
编译pthreads模块时,提示php编译时必须打开ZTS, 所以貌似必须 thread safe 版本才能使用. wamp中多php正好是TS的,直接下了个dll, 文档中的说明复制到对应目录,就在win下测试了。 还没完全理解,查到文章说 php 的 pthreads 和 POSIX pthreads是完全不一样的。代码有些烂,还需要多看看文档,体会一下。
html' target='_blank'>class Foo extends Stackable { public $url; public $response = null; public function __construct(){ $this- url = 'http://www.phpstudy.net'; public function run(){}class Process extends Worker { private $text = ""; public function __construct($text,$object){ $this- text = $text; $this- object = $object; public function run(){ while (is_null($this- object- response)){ print " Thread {$this- text} is running/n"; $this- object- response = 'http response'; sleep(1);$foo = new Foo();$a = new Process("A",$foo);$a- start();$b = new Process("B",$foo);$b- start();echo $foo- response;5.yield
php class AsyncServer { protected $handler; protected $socket; protected $tasks = []; protected $timers = []; public function __construct(callable $handler) { $this- handler = $handler; $this- socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$this- socket) { die(socket_strerror(socket_last_error())."/n"); if (!socket_set_nonblock($this- socket)) { die(socket_strerror(socket_last_error())."/n"); if(!socket_bind($this- socket, "", 1234)) { die(socket_strerror(socket_last_error())."/n"); public function Run() { while (true) { $now = microtime(true) * 1000; foreach ($this- timers as $time = $sockets) { if ($time $now) break; foreach ($sockets as $one) { list($socket, $coroutine) = $this- tasks[$one]; unset($this- tasks[$one]); socket_close($socket); $coroutine- throw(new Exception("Timeout")); unset($this- timers[$time]); $reads = array($this- socket); foreach ($this- tasks as list($socket)) { $reads[] = $socket; $writes = NULL; $excepts= NULL; if (!socket_select($reads, $writes, $excepts, 0, 1000)) { continue; foreach ($reads as $one) { $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port); if (!$len) { //echo "socket_recvfrom fail./n"; continue; if ($one == $this- socket) { //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port/n"; $handler = $this- handler; $coroutine = $handler($one, $data, $len, $ip, $port); if (!$coroutine) { //echo "[Run]everything is done./n"; continue; $task = $coroutine- current(); //echo "[Run]AsyncTask recv. data=$task- data ip=$task- ip port=$task- port timeout=$task- timeout/n"; $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$socket) { //echo socket_strerror(socket_last_error())."/n"; $coroutine- throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; if (!socket_set_nonblock($socket)) { //echo socket_strerror(socket_last_error())."/n"; $coroutine- throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; socket_sendto($socket, $task- data, $task- len, 0, $task- ip, $task- port); $deadline = $now + $task- timeout; $this- tasks[$socket] = [$socket, $coroutine, $deadline]; $this- timers[$deadline][$socket] = $socket; } else { //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port/n"; list($socket, $coroutine, $deadline) = $this- tasks[$one]; unset($this- tasks[$one]); unset($this- timers[$deadline][$one]); socket_close($socket); $coroutine- send(array($data, $len)); class AsyncTask { public $data; public $len; public $ip; public $port; public $timeout; public function __construct($data, $len, $ip, $port, $timeout) { $this- data = $data; $this- len = $len; $this- ip = $ip; $this- port = $port; $this- timeout = $timeout; function AsyncSendRecv($req_buf, $req_len, $ip, $port, $timeout) { return new AsyncTask($req_buf, $req_len, $ip, $port, $timeout); function RequestHandler($socket, $req_buf, $req_len, $ip, $port) { //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf/n"; try { list($rsp_buf, $rsp_len) = (yield AsyncSendRecv($req_buf, $req_len, "", 2345, 3000)); } catch (Exception $ex) { $rsp_buf = $ex- getMessage(); $rsp_len = strlen($rsp_buf); //echo "[Exception]$rsp_buf/n"; //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf/n"; socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port); $server = new AsyncServer(RequestHandler); $server- Run();
封装AsyncSendRecv接口,调用形如yield AsyncSendRecv(),更加自然;


