<?php
/**
* 并发任务辅助类
*
*/
class rolling_task_helper
{
private static $instance; // 单例模式下的唯一实例
private $tasks_can; // 任务列表
private $mch; // multi curl 句柄
private $default_options; // 默认的curl配置
private $exec_tasks; // 执行了的任务
public function __construct()
{
$this->mch = curl_multi_init();
$this->tasks_can = array();
$this->default_options = array(
CURLOPT_CUSTOMREQUEST => 'GET',
CURLOPT_NOBODY => false,
CURLOPT_RETURNTRANSFER => true, // 返回的数据不直接打出,通过方法获取
CURLOPT_TIMEOUT => 200,
CURLOPT_SSL_VERIFYPEER => false,
CURLOPT_SSL_VERIFYHOST => false,
);
}
public function __destruct()
{
curl_multi_close($this->mch);
}
public function get_task_can()
{
return $this->tasks_can;
}
public function set_default_options($options)
{
$this->default_options = $options;
}
public static function get_instance()
{
if (!isset(self::$instance)) {
self::$instance = new rolling_task_helper();
}
return self::$instance;
}
public function add_task($url, $success_callback=null, $fail_callback=null, $options=null)
{
$ch = curl_init();
if (!isset($options)) {
$options = $this->default_options;
}
curl_setopt_array($ch, $options);
curl_setopt($ch, CURLOPT_URL, $url);
array_push(
$this->tasks_can,
array(
'url' => $url,
'ch' => $ch,
'try_count' => 0,
'fail_count' => 0,
'success_callback' => $success_callback,
'fail_callback' => $fail_callback
)
);
}
private function new_worker()
{
if (empty($this->tasks_can)) {
return false;
}
$task = array_shift($this->tasks_can);
$task['try_count'] += 1;
$this->exec_tasks[(string)$task['ch']] = $task;
curl_multi_add_handle($this->mch, $task['ch']);
return true;
}
public function run($worker_num=5, $default_success_callback=null, $default_fail_callback=null)
{
for ($i = 0, $has_worker = true; $i < $worker_num && $has_worker; $i++) {
$has_worker = $this->new_worker();
}
do {
// 不断执行并获取返回,直到被通知可以继续进行下一步
do {
$mrc = curl_multi_exec($this->mch, $active);
} while($mrc == CURLM_CALL_MULTI_PERFORM);
// 阻塞程序直到有任务返回
curl_multi_select($this->mch);
// 不断取出返回的任务信息直到没有消息可以取出
while ($finished_task = curl_multi_info_read($this->mch)) {
$task_profile = &$this->exec_tasks[(string)$finished_task['handle']];
// 处理成功返回的任务
if ($finished_task['result'] == CURLE_OK) {
// 取出返回的内容
$content = curl_multi_getcontent($finished_task['handle']);
if (isset($task_profile['success_callback'])) {
$task_profile['success_callback']($task_profile['url'], $content);
} elseif (isset($default_success_callback)) {
$default_success_callback($task_profile['url'], $content);
}
// 移除执行完的句柄,并释放句柄资源
curl_multi_remove_handle($this->mch, $finished_task['handle']);
curl_close($finished_task['handle']);
// 装载新的任务
$this->new_worker();
} else {
$task_profile['fail_count'] += 1;
if (isset($task_profile['fail_callback'])) {
$task_profile['fail_callback']($finished_task, $task_profile);
} elseif (isset($default_fail_callback)) {
$default_fail_callback($finished_task, $task_profile);
}
// 移除执行完的句柄,并释放句柄资源
curl_multi_remove_handle($this->mch, $finished_task['handle']);
// TODO: 失败后重试
curl_close($finished_task['handle']);
// 装载新的任务
$this->new_worker();
}
}
} while ($active && $mrc == CURLM_OK);
return $this->exec_tasks;
}
}