curl实现并发请求

分类: 源代码 > PHP

<?php
/**
 * 并发任务辅助类
 *
 */

class rolling_task_helper
{
    private static $instance;  // 单例模式下的唯一实例
    private $tasks_can;  // 任务列表
    private $mch;  // multi curl 句柄
    private $default_options;  // 默认的curl配置
    private $exec_tasks;  // 执行了的任务


    public function __construct()
    {
        $this->mch = curl_multi_init();
        $this->tasks_can = array();
        $this->default_options = array(
            CURLOPT_CUSTOMREQUEST   => 'GET',
            CURLOPT_NOBODY          => false,
            CURLOPT_RETURNTRANSFER  => true,    // 返回的数据不直接打出,通过方法获取
            CURLOPT_TIMEOUT         => 200,
            CURLOPT_SSL_VERIFYPEER  => false,
            CURLOPT_SSL_VERIFYHOST  => false,
        );
    }

    public function __destruct()
    {
        curl_multi_close($this->mch);
    }

    public function get_task_can()
    {
        return $this->tasks_can;
    }

    public function set_default_options($options)
    {
        $this->default_options = $options;
    }

    public static function get_instance()
    {
        if (!isset(self::$instance)) {
            self::$instance = new rolling_task_helper();
        }
        return self::$instance;
    }

    public function add_task($url, $success_callback=null, $fail_callback=null, $options=null)
    {
        $ch = curl_init();
        if (!isset($options)) {
            $options = $this->default_options;
        }
        curl_setopt_array($ch, $options);
        curl_setopt($ch, CURLOPT_URL, $url);

        array_push(
            $this->tasks_can,
            array(
                'url'           => $url,
                'ch'            => $ch,
                'try_count'     => 0,
                'fail_count'    => 0,
                'success_callback' => $success_callback,
                'fail_callback'     => $fail_callback
            )
        );
    }

    private function new_worker()
    {
        if (empty($this->tasks_can)) {
            return false;
        }
        $task = array_shift($this->tasks_can);
        $task['try_count'] += 1;
        $this->exec_tasks[(string)$task['ch']] = $task;
        curl_multi_add_handle($this->mch, $task['ch']);
        return true;
    }

    public function run($worker_num=5, $default_success_callback=null, $default_fail_callback=null)
    {
        for ($i = 0, $has_worker = true; $i < $worker_num && $has_worker; $i++) {
            $has_worker = $this->new_worker();
        }

        do {
            // 不断执行并获取返回,直到被通知可以继续进行下一步
            do {
                $mrc = curl_multi_exec($this->mch, $active);
            } while($mrc == CURLM_CALL_MULTI_PERFORM);

            // 阻塞程序直到有任务返回
            curl_multi_select($this->mch);

            // 不断取出返回的任务信息直到没有消息可以取出
            while ($finished_task = curl_multi_info_read($this->mch)) {
                $task_profile = &$this->exec_tasks[(string)$finished_task['handle']];

                // 处理成功返回的任务
                if ($finished_task['result'] == CURLE_OK) {
                    // 取出返回的内容
                    $content = curl_multi_getcontent($finished_task['handle']);

                    if (isset($task_profile['success_callback'])) {
                        $task_profile['success_callback']($task_profile['url'], $content);
                    } elseif (isset($default_success_callback)) {
                        $default_success_callback($task_profile['url'], $content);
                    }

                    // 移除执行完的句柄,并释放句柄资源
                    curl_multi_remove_handle($this->mch, $finished_task['handle']);
                    curl_close($finished_task['handle']);
                    // 装载新的任务
                    $this->new_worker();
                } else {
                    $task_profile['fail_count'] += 1;

                    if (isset($task_profile['fail_callback'])) {
                        $task_profile['fail_callback']($finished_task, $task_profile);
                    } elseif (isset($default_fail_callback)) {
                        $default_fail_callback($finished_task, $task_profile);
                    }
                    // 移除执行完的句柄,并释放句柄资源
                    curl_multi_remove_handle($this->mch, $finished_task['handle']);
                    // TODO: 失败后重试
                    curl_close($finished_task['handle']);
                    // 装载新的任务
                    $this->new_worker();
                }
            }
        } while ($active && $mrc == CURLM_OK);
        return $this->exec_tasks;
    }
}

来源:原创 发布时间:2020-07-07 21:06:17