老早以前就想写这篇文章了,奈何一直觉得自己对 PHP 内核源码和协程概念了解的都还不够透彻,加之 PHP 世界确实比较少谈论包括协程在内的并发解决方案,所以一直搁置。今天把拖延症治疗一下,顺便再立个 flag : 后续陆续写一系列 PHP 源码分析、 Opcache / Swoole 等扩展源码分析的文章,作为学习笔记,加油!

回忆在我学习协程相关内容的过程中,读了很多文档和博客,可是很多都写的晦涩难懂,读完还是觉得云里雾里。个人很认同费曼学习法,所以此文力求深入浅出,尽量做到即使读者之前不了解协程,读完也能够解惑。正因为如此,行文难免有些“废话”,有基础的读者可以自行跳过。

本文完整源码:github.com/Quarkay/BlogFragments/tree/master/php-coroutine

网络并发处理方案

一个编程概念的出现,往往是在某一时期,软件开发遇到了具体的问题,于是一步步改进现有技术手段,总结出方法,最后提出抽象好的解决方案,再用一个名词来概括。多进程、多线程、协程等等,就是这样一个个概括名词。

这些设计的出现和演化,与网络编程很有关系,例如经典的 C10K 问题。以服务端为例,监听端口,然后等待客户端连接,一个服务端可能需要同时服务大量客户端,从而在某一时刻可能形成大量并发连接。

多进程

接触编程,就一定会了解到进程 —— 操作系统调度的对象。为了同时运行多个任务,操作系统以适当的方式让不同进程轮流切换使用 CPU 资源。对用户来说,看起来就好像同时在跑,也就是所谓的时间片轮转。很显然,上面服务端遇到的同时处理大量客户端连接的问题,多进程是一个解决方案,来一个连接就起一个进程,由该进程去处理就好了,负责监听端口的进程继续监听新来的连接。

多线程

为了处理更大量的并发连接,就得改进现有方案。人们发现进程切换开销太大,因为你得保存执行现场,切换回来之后又得对应恢复,然后才能接着执行。而并发数上来后进程就起太多了,此时操作系统就耗费了太多资源在进程切换上了。如果能减小这个开销,那势必能提升并发处理能力。于是就出现了线程 —— 操作系统调度的最小单位。

同一进程的多个线程共享了进程的很多资源,自然的,线程切换起来,保存和恢复执行环境的开销要比进程切换小一些。那我们来一个连接就起一个线程去处理任务,最后并发量上来之后,操作系统会多很多线程,从而整体切换开销比多进程的时候小很多。但是很显然,开销还是有的,并发处理能力虽比多进程强,但是瓶颈依旧存在,并发量再往上,就处理不了了,因为操作系统耗费大量资源去进行线程切换了。

协程

回忆多进程、多线程处理方案,其本质其实就是切换运行多个代码片段,同时各自有各自的运行上下文环境,操作系统会不断地对运行环境保存、恢复、继续执行。如果代码可以控制自身代码片段的执行顺序,自己调度各个代码片段,操作系统的调度消耗自然可以减小很多。同时,对诸如网络服务端这样的任务场景,如果能够根据网络事件,自行对代码片段进行调度、执行,更是合适,操作系统也不会出现大量的进程、线程,而被切换到的每一个代码片段在执行的时候也不需要占着 CPU 等待网络事件(因为本来就是根据网络事件才安排执行的)。于是乎,协程的概念就出来了。

就像打地鼠

我们需要的是,让代码可以直接控制自身代码片段根据需要“跳来跳去”交替执行,省掉操作系统的切换开销。既然要自己调度,最基础的实现方式就是做一个任务队列,根据需要把任务入列,并不断地取出执行即可。这么讲不太直观,以 HTTP 服务器为例,同时来了多个并发连接,如果其中一个连接能读,就把执行读操作的片段和连接信息送入执行队列,如果其中一个能执行写的操作,就把写操作的片段和连接信息送入执行队列。后续不断地取出任务执行对应片段,直观上就是代码片段交替着运行了。这里的任务,实质上就是所谓的协程了。

协程调度的实现

对于协程的实现,最朴素的想法应该是利用 goto 语法 —— 因为要自己控制“跳来跳去”嘛。但是 goto 不能跨函数跳转,其次想要实现双向通信,只能通过各 label 片段共享的变量作为通信手段,所以不合适且需要特殊处理通信动作。那 C 语言里面的 setjmp() / longjmp() 可以吗?其既保存了部分执行环境信息,又可以跨函数跳转,当然是满足要求的,但我们讨论的是纯 PHP 的实现。那 PHP 有没有满足要求的语法呢?有的 —— yield 语法。

yield 语法

最早接触到协程概念,是在学 Python 的时候,那时候提到并发解决方案就必然提到 Tornado ,一看 Tornado 文档,充斥着不明所以的 yield 语法,顿感高级:

代码片段1:Tornado 早期版本文档摘录

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")

而 PHP 则是从 PHP 5.5.0 版本开始支持 Generator 与协程。整体而言被提及较少,反而 PHP-fpm (多进程)方案更普及。这也是为什么 PHP 常规技术栈常被诟病并发量上来就支撑不住。如果不了解 PHP 的 yield 语法,请先阅读 yield 相关语法文档: PHP IterationPHP Generator

yield 语法做到了前面提到的要求 —— 自动为我们保存好了各片段的执行环境,能跨函数“跳来跳去”地执行。同时提供了拿来即用的双向通信能力,用来实现协程非常合适。

生成 Generator

按照文档描述,一个 PHP 函数如果含有 yield 语法,那么调用该函数就会自动生成一个 Generator 对象。对调用者来说,一个 Generator 对象就是一段可控的代码片段,同时自动保存了上下文状态。那么怎么样控制该片段的执行呢?首先看看内部类 Generator 的摘要:

代码片段2:内部类 Generator 摘要

final class Generator implements Iterator {
    /* Methods */
    public current(): mixed
    public getReturn(): mixed
    public key(): mixed
    public next(): void
    public rewind(): void
    public send(mixed $value): mixed
    public throw(Throwable $exception): mixed
    public valid(): bool
    public __wakeup(): void
}

调用包含 yield 的函数之后,初始状态就自动保存在该对象里面。对调用者来说,交替调用 current()next() / send() 即可。每一次 next() 就是让其继续往下执行一段,遇到 yield 就会停止该片段的执行并返回调用处。如果需要互相通信, yield 后面可接返回值,调用者通过 current() 即可获取,如果要对代码片段传值,则调用者通过 send($val) 即可传递,对 Generator 代码片段而言,此 $val 即为 yield 表达式的返回值。测试代码如下:

代码片段3:Generator 对象测试代码

function foo()
{
    echo "foo\n";
    $sendVal = yield "yield1\n";

    var_dump($sendVal);
    echo "bar\n";
    $sendVal = yield "yield2\n";

    var_dump($sendVal);
    echo "quarkay\n";
    $sendVal = yield "yield3\n";

    var_dump($sendVal);
    echo "finally stmt\n";
}

$fooRes = foo();
var_dump($fooRes);

echo "==============\n";
$valueId = 1;
while ($fooRes->valid()) {
    echo "--------------\n";
    echo $fooRes->current();
    echo "--------------\n";
    echo $fooRes->send('test send value id: ' . $valueId++);
}

echo "==============\n";
var_dump($fooRes->current());
echo "==============\n";
$fooRes->next();

对应的输出结果如下:

object(Generator)#3 (0) {
}
==============
foo
--------------
yield1
--------------
string(21) "test send value id: 1"
bar
yield2
--------------
yield2
--------------
string(21) "test send value id: 2"
quarkay
yield3
--------------
yield3
--------------
string(21) "test send value id: 3"
finally stmt
==============
NULL
==============

从上面例子可以看出, yield 就相当于状态保存点,同时起到双向通信的作用。但是有几点需要留意:

  1. 调用函数生成 Generator 对象时,首个 yield 语句之前的代码并不会被执行,直到 valid() 或者 current() 被调用。
  2. 在 Generator 对象初始状态下,若直接调用 next() / send() 会运行到第二个 yield 暂停,且首个 yield 的返回值被忽略。
  3. 对已执行完毕,即调用 valid() 会返回 false 的 Generator 对象,调用 next() / send() 不会产生异常或警告。
  4. 某次调用 next() /send() 后若不是停止于 yield ,则再调用 current() 返回 NULL 。

调度 Generator

看到这里,有同学可能会心生疑惑,上述测试代码似乎跟协程关系不大?其实这么来想就好了,本身文件执行可以被认为是一个协程,执行过程中间歇地与生成的协程互相“跳来跳去”执行,这不就是简陋的协程吗?但是这种简陋的做法是不够用的,所以需要采取手段去调度协程的执行。前面提到的最简单方式,做一个存储任务与参数的队列即可,由调度器依次取出执行即可。于是乎我们的调度类呼之欲出。

代码片段4:调度类 TaskScheduler

class TaskScheduler
{
    private SplQueue $queue;

    public function __construct()
    {
        $this->queue = new SplQueue();
    }

    public function newTask(Generator $generator): void
    {
        $this->queue->enqueue($generator);
    }

    public function hasTask(): bool
    {
        return !$this->queue->isEmpty();
    }

    public function run(): void
    {
        if (!$this->hasTask()) {
            echo "error: run scheduler without task!\n";
            return;
        }

        while ($this->hasTask()) {
            $task = $this->queue->dequeue();
            if ($task->valid()) {
                $task->next();
                $this->queue->enqueue($task);
            }
        }
    }
}

function foo(): Generator
{
    for ($i = 0; $i <= 3; $i++) {
        echo "inside coroutine: ", $i, "\n";
        yield;
    }
}

$runner = new TaskScheduler();
$runner->newTask(foo());
$runner->newTask(foo());
$runner->run();

对应的输出结果如下:

inside coroutine: 0
inside coroutine: 1    # 首次运行的协程,执行到第二个 yield 才切换
inside coroutine: 0
inside coroutine: 1    # 同上
inside coroutine: 2
inside coroutine: 2
inside coroutine: 3
inside coroutine: 3

可以见到,已经实现了预期的效果,新生成的两个协程在交替执行,而协程自身通过 yield 让出执行权。虽然二者不断切换,但执行的上下文有自动保留和恢复。调度类本身则比较简单,就是一个入列出列,依次执行罢了。当然了,这种方案是最简方案而不是最优方案。细心的同学可能会发现两个问题:

  1. 说好的双向通信呢?貌似并没有做到和用到。
  2. 首次执行协程 next() 动作,运行到第二个 yield 语句才停下。

既然存在问题,那就想办法解决。 Generator 是个内部类,不便于继承和更改。所以采取封装的方案,将 Generator 作为成员封装到自己的任务类里面,再用任务类解决以上两个问题好了。

与 Generator 双向通信

为了实现双向通信,封装类必须提供相应的成员方法和存储空间。在提供通信能力的同时,可以顺便解决首次调用 next() 会直接跳到第二个 yield 的问题,只需要加一个标识位即可。这里把 Generator 封装到 Task 类里面,如下面代码所示。

代码片段5:将 Generator 封装到 Task 类

class Task
{
    private bool $firstRound;
    private Generator $generator;

    private mixed $sendValue;
    private mixed $returnValue;

    public function __construct(Generator $generator)
    {
        $this->firstRound = true;
        $this->generator = $generator;

        $this->sendValue = null;
        $this->returnValue = null;
    }

    public function sendMsg(mixed $sendValue): Task
    {
        $this->sendValue = $sendValue;
        return $this;
    }

    public function resetMsg(): Task
    {
        $this->sendValue = null;
        return $this;
    }

    public function recvMsg(): mixed
    {
        return $this->returnValue;
    }

    public function finished(): bool
    {
        return !$this->generator->valid();
    }

    public function go(): Task
    {
        if ($this->firstRound) {
            $this->firstRound = false;
            $this->returnValue = $this->generator->current();
            return $this;
        }

        if ($this->finished()) {
            echo "error: try to run already finished task!";
            return $this;
        }

        $this->generator->send($this->sendValue);
        $this->returnValue = $this->generator->current();
        $this->resetMsg();
        return $this;
    }
}

那么,此时对应的调度类也需要对应微调,因为其直接调度的对象类型从 Generator 对象变为了 Task 对象,主要是对其 run() 方法进行了修改,修改后如下:

代码片段6:修改 TaskScheduler 类 run() 方法

class TaskScheduler
{
    // 其它方法不作变更
    // ....

    public function run(): void
    {
        if (!$this->hasTask()) {
            echo "error: run scheduler without task!\n";
            return;
        }

        while ($this->hasTask()) {
            $task = $this->queue->dequeue();
            $task->go();
            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }
        }
    }
}

测试封装的协程库

到目前为止,协程库的功能还较为有限,只是基本的调度和通信能力,同时解决了协程首次运行时需要特殊处理的问题。修改测试代码如下:

代码片段7:封装 Task 后修改测试代码

function foo(): Generator
{
    for ($i = 0; $i <= 3; $i++) {
        echo "inside coroutine: ", $i, "\n";
        $sendValue = yield;
        if ($sendValue) {
            echo "got \$val: ", $sendValue, "\n";
        }
    }
}

$runner = new TaskScheduler();
$taskA = new Task(foo());
$taskB = new Task(foo());
$runner->newTask($taskA->sendMsg("AAA"));
$runner->newTask($taskB);
$runner->run();

其输出如下:

inside coroutine: 0
inside coroutine: 0
got $val: AAA
inside coroutine: 1
inside coroutine: 1
inside coroutine: 2
inside coroutine: 2
inside coroutine: 3
inside coroutine: 3

可见此时不存在协程首次执行时,会运行到第二个 yield 的问题了。其次,通信能力正常,协程收到了 "AAA" 字符串并根据逻辑输出。有了基础骨架,后面根据需要在此基础上添加功能即可。

丰富协程库功能

若某个协程在执行网络等待、读取的动作,根据目前的基础骨架,不会执行协程切换动作,整个进程都会等待,直到该耗时操作完成。这里以 sleep() 代替耗时动作,可以这么修改 foo() 函数来测试:

代码片段8:sleep() 模拟耗时操作

function foo(): Generator
{
    for ($i = 0; $i <= 3; $i++) {
        sleep(3);
        echo "inside coroutine: ", $i, "\n";
        $sendValue = yield;
        if ($sendValue) {
            echo "got \$val: ", $sendValue, "\n";
        }
    }
}

运行修改后的测试代码,会发现每隔 3s 才会有新的输出和切换动作。很显然,如果协程采取这种方案处理网络任务是起不到预期效果的。

运行加入了 sleep() 函数的协程测试结果动图

其实最开始在讨论进程、线程、协程概念时已经提到,正确处理方式应当是由协程调度模块根据网络事件来决定哪些协程加入队列,这样协程被取出执行时就不需要再等待。而调度模块起始并不知道某个协程在等待什么网络连接和事件,这就又回到双向通信了,由协程的 yield 返回值告知调度模块即可。

也就是说,协程的 yield 返回值起到了改变协程调度模块动作的作用,把这种具备特殊作用的返回值,称为协程库的“系统调用”是可以的。除了网络相关指令,可能还有如新建协程、取消协程之类的“系统调用”(注意,这里指的是在运行的协程内进行调用)。

协程库“系统调用”指令

既然要实现类似系统调用的功能,一种直观的做法是将调用进行编号,然后协程在需要时,让返回值包含任务编号和参数列表。既然要编号,那就用枚举来对系统调用进行编号,然后由枚举值查表,找到对应动作,并将参数列表传入并调用。一种简单的编号和传参方案如下面代码所示:

代码片段9:系统调用枚举类型和 yield 进行系统调用的返回类型

enum TaskAPI
{
    case NewTask;
    case CancelSelf;
}

class SysCall
{
    public TaskAPI $api;
    public mixed $param1;
    public mixed $param2;
    public mixed $param3;

    public function __construct(TaskAPI $api, mixed $param1 = null, mixed $param2 = null, mixed $param3 = null)
    {
        $this->api = $api;
        $this->param1 = $param1;
        $this->param2 = $param2;
        $this->param3 = $param3;
    }
}

编号和传参如果按照上述代码编写,则只需要简单修改 TaskScheduler 的 run() 方法,去调用系统调用的查表和执行动作即可。而查表与执行这里实现的非常简单,就是一个简单的 switch-case ,作为学习案例,实现方式越简单越好。

代码片段10:修改 TaskScheduler 以支持系统调用

class TaskScheduler
{
    // 同前面一样,TaskScheduler 的其他方法不需要修改
    // ...

    public function run(): void
    {
        if (!$this->hasTask()) {
            echo "error: run scheduler without task!\n";
            return;
        }

        while ($this->hasTask()) {
            $task = $this->queue->dequeue();
            $task->go();
            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }

            if ($task->recvMsg() instanceof SysCall) {
                $this->handleSyscall($task, $task->recvMsg());
            }
        }
    }

    private function popTask(Task $task): void
    {
        if (!$task->finished()) {
            $popped = $this->queue->pop();
            if ($popped !== $task) {
                echo "error: wrong task popped!\n";
            }
        }
    }

    public function handleSyscall(Task $task, SysCall $call): void
    {
        switch ($call->api) {
            case TaskAPI::NewTask:
                $this->newTask(new Task($call->param1));
                break;
            case TaskAPI::CancelSelf:
                $this->popTask($task);
                break;
            default:
                echo "error: unknown SysCall!\n";
                break;
        }
    }
}

根据这种实现, TaskAPI::NewTask 这个调用,很显然就是新建协程,参数只需要提供一个 Generator 就行。而 TaskAPI::CancelSelf 更简单,甚至不需要参数,只需要把前面已经在 run() 中重新入列的自己再取出来就可以了。注意,从队列尾部取出用的是 pop()。至此,我们可以编写系统调用相关的测试协程样例了:

代码片段11:协程进行系统调用的测试代码

function foo(): Generator
{
    for ($i = 0; $i <= 3; $i++) {
        echo "inside coroutine: ", $i, "\n";
        if ($i === 2) {
            yield new SysCall(TaskAPI::CancelSelf);
        }
    }
}

function bar(): Generator
{
    echo "Hello, I'm quarkay in bar.\n";
    yield new SysCall(TaskAPI::NewTask, foo());
    echo "bye bye, I'm quarkay in bar.\n";
}

$runner = new TaskScheduler();
$taskA = new Task(bar());
$runner->newTask($taskA);
$runner->run();

对应的运行结果如下:

Hello, I'm quarkay in bar.
bye bye, I'm quarkay in bar.
inside coroutine: 0
inside coroutine: 1
inside coroutine: 2

可见 bar() 成功的执行了新建协程指令,后续加入的协程正常运行,但在 $i === 2 成立时,其执行了取消自己的动作,于是后续动作不再继续执行。

协程库“网络调用”指令

看到这里,还记得是为了什么才开始讨论协程的“系统调用”吗?因为我们希望协程调度模块能根据网络事件,来决定哪些协程加入队列。而要做到这一点,得需要“系统调用”这种机制,让协程得以告诉调度模块,我在等待什么,你可以等时候到了,再让我入列接着跑。姑且称这种“系统调用”为“网络调用”指令好了,更加明确。

最常见的网络调用,无非是针对某个 socket ,看看是否可读、可写、出错。那么系统调用参数至少得包含 socket 信息、等待事件信息。而调度器得知后,首先应当把这个协程从队列尾部再取出来,因为你要等网络事件啊,这很正常。取出来之后呢?自然得保存一下,等后续网络事件来了再把它入列即可。那协程从队尾取出来后,具体怎么保存比较合适呢?网络事件发生后,调度器是能拿到 socket 信息和事件信息的,而系统调用的时候也有这两者,做个 HashMap 就很合适。一个简单的实现可分为如下步骤。

首先修改“系统调用”枚举,新增“网络调用”相关的项。

代码片段12:新增“网络调用”相关枚举项

enum TaskAPI
{
    // 其他不变,新增所需项
    // ...

    case WaitSocketAccept;
    case WaitSocketRead;
    case WaitSocketWrite;
}

此时可以修改调度器的 handleSyscall() 方法,补充与网络调用相关的 switch-case 分支。为了使用方便,对应于等待连接、可读、可写这三种网络事件,分别用到了两个 array 类型的成员变量,一个作为 socket 存储列表,一个作为从 socket 到 Task 的哈希表。

代码片段13:修改 TaskScheduler 以支持“网络调用”指令

class TaskScheduler
{
    // 其他不变,仅新增下面六个属性、修改 handleSyscall() 方法
    // ...

    private array $waitingToAcceptSockets = [];
    private array $waitingToAcceptTaskMap = [];
    private array $waitingToReadSockets = [];
    private array $waitingToReadTaskMap = [];
    private array $waitingToWriteSockets = [];
    private array $waitingToWriteTaskMap = [];

    public function handleSyscall(Task $task, SysCall $call): void
    {
        switch ($call->api) {
            case TaskAPI::NewTask:
                $this->newTask(new Task($call->param1));
                break;
            case TaskAPI::CancelSelf:
                $this->popTask($task);
                break;
            case TaskAPI::WaitSocketAccept:
                $this->popTask($task);
                $this->waitingToAcceptSockets[(int)$call->param1] = $call->param1;
                $this->waitingToAcceptTaskMap[(int)$call->param1] = $task;
                break;
            case TaskAPI::WaitSocketRead:
                $this->popTask($task);
                $this->waitingToReadSockets[(int)$call->param1] = $call->param1;
                $this->waitingToReadTaskMap[(int)$call->param1] = $task;
                break;
            case TaskAPI::WaitSocketWrite:
                $this->popTask($task);
                $this->waitingToWriteSockets[(int)$call->param1] = $call->param1;
                $this->waitingToWriteTaskMap[(int)$call->param1] = $task;
                break;
            default:
                echo "error: unknown SysCall!\n";
                break;
        }
    }
}

至此,调用“网络指令”就生效了,调度器会把调用指令的协程从执行队列取出,并保存到对应位置。现在的问题是,调度器怎么根据相关 socket 的事件再把协程放回执行队列呢?很显然无论如何,调度器总得找时机去查验各 socket 状态,一种实现方式是让这个查验的动作本身也成为一个普通的协程,只不过这个协程能操作上面新增的属性、取出保存的协程并送入执行队列。要做到这些,最方便的方式是使用 TaskScheduler 的成员方法,是的,对象的方法如果包含 yield ,调用它也会返回 Generator 对象,用其生成协程,操作这些数据再方便不过了。

代码片段14:修改 TaskScheduler 新增 socket 状态检查动作

class TaskScheduler
{
    // 新增 dispatchNetEvent() 方法
    // ...

    public function dispatchNetEvent(): Generator
    {
        while (true) {
            $rSocketList = array_merge(
                array_values($this->waitingToReadSockets),
                array_values($this->waitingToAcceptSockets),
            );
            $wSocketList = array_values($this->waitingToWriteSockets);
            $eSocketList = array_merge($rSocketList, $wSocketList);

            if (!$eSocketList) {
                if (!$this->hasTask()) {
                    yield new SysCall(TaskAPI::CancelSelf);
                }
                yield;
                continue;
            }

            $seconds = $this->hasTask() ? 0 : 1;
            $num = stream_select($rSocketList, $wSocketList, $eSocketList, $seconds);
            if (!$num) {
                yield;
                continue;
            }

            foreach ($rSocketList as $rSocket) {
                if (isset($this->waitingToAcceptTaskMap[(int)$rSocket])) {
                    $acceptSocket = stream_socket_accept($rSocket, 0);
                    $this->queue->enqueue($this->waitingToAcceptTaskMap[(int)$rSocket]->sendMsg($acceptSocket));
                    unset($this->waitingToAcceptSockets[(int)$rSocket]);
                    unset($this->waitingToAcceptTaskMap[(int)$rSocket]);
                }
                if (isset($this->waitingToReadTaskMap[(int)$rSocket])) {
                    $this->queue->enqueue($this->waitingToReadTaskMap[(int)$rSocket]->sendMsg($rSocket));
                    unset($this->waitingToReadSockets[(int)$rSocket]);
                    unset($this->waitingToReadTaskMap[(int)$rSocket]);
                }
            }
            foreach ($wSocketList as $wSocket) {
                $this->queue->enqueue($this->waitingToWriteTaskMap[(int)$wSocket]->sendMsg($wSocket));
                unset($this->waitingToWriteSockets[(int)$wSocket]);
                unset($this->waitingToWriteTaskMap[(int)$wSocket]);
            }

            foreach ($eSocketList as $eSocket) {
                echo "error: exception in socket" . $eSocket . "\n";
                unset($this->waitingToAcceptSockets[$eSocket]);
                unset($this->waitingToAcceptTaskMap[$eSocket]);
                unset($this->waitingToReadSockets[$eSocket]);
                unset($this->waitingToReadTaskMap[$eSocket]);
                unset($this->waitingToWriteSockets[$eSocket]);
                unset($this->waitingToWriteTaskMap[$eSocket]);
            }

            yield;
        }
    }
}

上述代码是很典型的 I/O 多路复用写法。值得讨论的是两处 continue 的判断方式,前者是在考虑如果当前没有需要等待的网络事件,那就得看看还有没有其他协程了,如果有那说明后续其他协程可能会产生网络调用指令,这个协程不能停,如果此时都没有其他协程了,那自然可以结束;后者是 I/O 多路复用的状态检查出结果时,若一个事件都没有,只是等待超时了才返回的,那就继续执行后续协程,等轮一遍再到队尾的自己,再次进行检查,避免浪费时间。当然了,如果此时本 I/O 状态检查协程是唯一在调度的协程,超时时间就可以不为 0 ,挂着等就行 —— 反正都只有自己了,来去切换还是自己,而且此时若超时为 0 且没有网络事件发生,还会让 CPU 跑切换跑到 100% 。

其次,以 WaitSocketAccept 调用为例,恢复运行时需要告知协程新来的网络连接的 socket 。既然能够双向通信那就用起来,调度器在将保存的协程送入队列时,顺便设置一下 yield 表达式的返回值,也就是 send($val) 中的 $val 即可。回忆前面 Task 类的实现,也就是通过 sendMsg($socket) 进行设置。

写完发现问题又来了,这个负责检查网络事件的协程什么时机入列呢?比较合适的方案是做两个标识位,以此标记:1.是否有协程在等待网络事件、2.是否启动了负责网络检查的协程。在调度器的 run() 方法中,每次切换时都进行判断,需要时就入列。对应地,在 dispatchNetEvent() 方法中,若等待列表为空,将对应的判断分支,修改为直接调用取消自身的系统调用即可,不见兔子不撒鹰嘛。但这里作为学习案例,采用更简单的方案,直接修改 run() 方法,在其调用首个用户协程之前,自动新建检查网络事件的协程。只需要加入一行代码即可做到。

代码片段15:修改调度器 run() 方法,自动新建网络事件检查协程

    public function run(): void
    {
        if (!$this->hasTask()) {
            echo "error: run scheduler without task!\n";
            return;
        }

        $this->newTask(new Task($this->dispatchNetEvent()));
        while ($this->hasTask()) {
            $task = $this->queue->dequeue();
            $task->go();
            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }

            if ($task->recvMsg() instanceof SysCall) {
                $this->handleSyscall($task, $task->recvMsg());
            }
        }
    }

至此,具备简单网络处理能力的协程库就成型了,可以编写相关测试代码,这里编写了一个返回固定数据的 http-reply-server 作为测试。由一个 server 协程负责监听,一旦有新的连接就新建 reply 协程去处理,而 reply 协程先是等待建立的连接可读,随后读出数据,然后等待其可写,最后返回固定的 HTTP 报文信息。

代码片段16:测试“网络调用”指令

function listen()
{
    $socket = stream_socket_server("tcp://0.0.0.0:2333", $errno, $errMsg);
    if (!$socket) {
        echo "error: failed to listen, ${errMsg}\n";
        return null;
    }
    return $socket;
}

function demoHttpResp(): string
{
    return <<<http
HTTP/1.1 200 OK
Content-Length: 19
Content-Type: text/html
Server: Quarkay.Demo

hello from quarkay!
http;
}

function server(): Generator
{
    $listenSocket = listen();
    if (!$listenSocket) {
        yield new SysCall(TaskAPI::CancelSelf);
    }

    while (true) {
        $socket = yield new SysCall(TaskAPI::WaitSocketAccept, $listenSocket);
        stream_set_blocking($socket, 0);
        yield new SysCall(TaskAPI::NewTask, reply($socket));
    }
}

function reply($socket): Generator
{
    yield new SysCall(TaskAPI::WaitSocketRead, $socket);
    $data = stream_socket_recvfrom($socket, 1024);
    if (!$data) {
        echo "error: failed to read from socket!\n";
        goto shutdown;
    }

    yield new SysCall(TaskAPI::WaitSocketWrite, $socket);
    $len = stream_socket_sendto($socket, demoHttpResp());
    if (!$len) {
        echo "error: failed to write to socket!\n";
        goto shutdown;
    }

    shutdown:
    stream_socket_shutdown($socket, STREAM_SHUT_RDWR);
}

$runner = new TaskScheduler();
$server = new Task(server());
$runner->newTask($server);
$runner->run();

运行后进程会常驻,等待网络连接,随后返回固定的 HTTP 报文,如下所示:

$ curl http://127.0.0.1:2333
hello from quarkay!

虽然目前的处理很简陋,且实现方案不考虑性能,但如果进行压力测试,会发现协程方案的处理能力确实和 PHP-fpm 不在一个数量级。

无栈协程

目前为止,前面编写的所有协程处理都是无栈协程。正所谓由俭入奢易由奢入俭难,先了解无栈协程的蛋疼,然后再去了解有栈协程,就会感觉美滋滋。那无栈协程蛋疼在哪呢?我认为主要问题在于,多个 Generator 无法直接嵌套组合调用,也就无法形成简单而通用的封装。

以上面的测试代码为例,逻辑是 server() 协程负责 accpet() ,新连接到来就启用 reply() 协程去处理该连接。这两者不是直接调用,而是 server() 使用了系统调用,并将 reply() Generator 作为参数传递。而仔细看 reply() 部分,等待 I/O -> 操作 I/O 是很通用的做法,直接封装成带 yield 的函数,这里直接调用不是更方便吗?大概逻辑伪代码如下:

代码片段17:组合 Generator 伪代码示例

function reply($socket): Generator
{
    $data = IO_Read($socket);
    if ($data) {
        IO_Write($socket, $data);
        IO_Shutdown($socket);
    }
}

function IO_Read($socket): Generator
{
    yield new SysCall(TaskAPI::WaitSocketRead, $socket);
    $data = stream_socket_recvfrom($socket, 1024);
    if (!$data) {
        echo "error: failed to read from socket!\n";
        IO_Shutdown($socket);
        return null;
    }
    return $data;
}

// 其他两个封装类似
// ...

上述代码之所以是伪代码,是因为跑起来压根不符合预期。以 reply() 为例,其自身并不包含 yield 语句,所以压根就不是 Generator 。其次,它调用的 IO_*() 系列函数并不会执行,只会原地生成一个 Generator ,并且不再用到更谈不上执行。那这还怎么玩? Generator 之间没法嵌套组合起来调用啊,总不能都是裸调“系统调用”,也不进行封装,所有逻辑全写到一个 Generator 里面吧。

怎么样才能更舒服呢?当然是可以嵌套组合多个 Generator ,同时让其在调度器看来就是一个协程,其中任何嵌套层次的某一个步骤需要等待网络事件,则这个协程就挂起,恢复时从再挂起点正常恢复,无论其组合层次如何。怎么做到这点呢?一种实现方式是让组合的 Generator 返回给调度器,让调度器来操作。首先来看怎么返回给调度器,无非又是双向通信,也就是说把 Generator 通过 yield 返馈给调度器,例如这样:

代码片段18:嵌套组合 Generator 示意

function reply($socket): Generator
{
    $data = yield IO_Read($socket);
    // 其他不变
    // ...
}

运行到此 yield 语句时,对于调度器来说,切换协程时拿到的 yield 返回值就是一个 Generator (后续称其为被嵌套协程)。调度器就能据此判断此时的协程是在执行组合嵌套动作。那么很显然,当前的协程得先暂停存起来,后续继续执行被嵌套协程,直到拿到被嵌套协程的最终结果,然后一层一层往上恢复执行。用数据结构来说,这就叫栈。而 yield 语法不直接提供这种功能,所以叫无栈协程。虽然说称呼为无栈,但是这样修改调度器还是能做到有栈协程的效果。按照这种思路,一步一步分步实现。首先是 Task 的栈式结构,这个很简单:

代码片段19:修改 Task 类以支持栈式协程

class Task
{
    // 其他不变,仅以下新增
    // ...

    private ?self $parentTask = null;

    public function stackIn(Generator $gen): self
    {
        $childTask = new Task($gen);
        $childTask->parentTask = $this;
        return $childTask;
    }

    public function stackOut(): self|null
    {
        return $this->parentTask;
    }
}

入栈时机我们已经很明确了,就是在协程 yield 返回值为 Generator 的时候。那什么时候出栈返回到上一层呢?这个是需要约定的,根据什么约定,当然也可以通过 yield 返回值了,通过这个进行双向通信最简单。我们可以新建一个类来作为识别类型,里面能包含结果即可。

代码片段20:新建协程出栈结果封装类

class TaskStackReturn
{
    private mixed $result;

    public function __construct($result)
    {
        $this->result = $result;
    }

    public function getResult(): mixed
    {
        return $this->result;
    }
}

至此,所需数据结构、入栈识别方式、出栈识别方式都明确了,修改调度器几乎不需要太多思考。无非就是协程切换时判断 yield 返回类型,如果是 Generator 就入协程栈,然后把被嵌套协程放入执行队列;如果是 TaskStackReturn 类型,就弹出协程栈顶,并把新的栈顶放入执行队列。只有一点需要留意,就是嵌套协程出栈时,新的栈顶协程是需要被嵌套协程的执行结果的,这一点是通过对新栈顶协程调用 $parentTask->sendMsg($msg->getResult()) 传递的。

代码片段21:修改调度器以支持栈式协程

class TaskScheduler
{
    // 其他不需要修改,仅修改 run() 方法即可
    // ...

    public function run(): void
    {
        if (!$this->hasTask()) {
            echo "error: run scheduler without task!\n";
            return;
        }

        $this->newTask(new Task($this->dispatchNetEvent()));
        while ($this->hasTask()) {
            $task = $this->queue->dequeue();
            $task->go();
            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }

            $msg = $task->recvMsg();
            if ($msg instanceof SysCall) {
                $this->handleSyscall($task, $msg);
            }

            if ($msg instanceof Generator) {
                $this->popTask($task);
                $childTask = $task->stackIn($msg);
                $this->queue->enqueue($childTask);
            }

            if ($msg instanceof TaskStackReturn) {
                $this->popTask($task);
                $parentTask = $task->stackOut();
                if ($parentTask) {
                    $this->queue->enqueue($parentTask->sendMsg($msg->getResult()));
                }
            }
        }
    }
}

经过这波加强,可以愉快地进行封装和互相嵌套调用了,那就把前面的伪代码改一改,让其可以真实运行:

代码片段22:修改测试代码,让伪代码得以正常运行

function reply($socket): Generator
{
    $data = yield IO_Read($socket);
    if ($data) {
        yield IO_Write($socket);
    }
}

function IO_Read($socket): Generator
{
    yield new SysCall(TaskAPI::WaitSocketRead, $socket);
    $data = stream_socket_recvfrom($socket, 1024);
    if (!$data) {
        echo "error: failed to read from socket!\n";
        stream_socket_shutdown($socket, STREAM_SHUT_RDWR);
    }

    yield new TaskStackReturn($data);
}

function IO_Write($socket): Generator
{
    yield new SysCall(TaskAPI::WaitSocketWrite, $socket);
    $len = stream_socket_sendto($socket, demoHttpResp());
    if (!$len) {
        echo "error: failed to write to socket!\n";
        stream_socket_shutdown($socket, STREAM_SHUT_RDWR);
    }

    yield new TaskStackReturn($len);
}

至此,基本的框架模子算是完成了,也自行封装了协程栈,算是一个有点意思的玩具代码了。

有栈协程

前面提到了,单纯的包含 yield 的函数,无法简单地互相嵌套组合,直到我们自行实现了协程栈。也从侧面说明,协程栈这个功能既非常需要,又实现起来不难。语言层面能不能直接支持一下呢?事实上,虽然 yield 不直接支持,但是 PHP 另外提供了 yield from 语法,这个做法还是蛮通用的,据我所知 Python 也是这么干的。

怎么理解语法层面提供的有栈协程呢,先回忆我们面对无栈协程怎么自己做到的有栈效果?无非就是按照嵌套层次依次操作的 Generator 。调度器调度的时候,利用栈式结构实现的。而现在这些活,语法层面用 yield from 帮你干了。于是乎,可以把通过 yield from 组合起来的若干个 Generator 看成一个整体的“大 Generator ”。对于调度器来说,你就是一个大 Generator 而已,至于具体的执行流程,不需要调度器再去操心帮你维护栈了。

其次是嵌套协程时的返回值问题,之前自己实现是通过封装 TaskStackReturn 类,然后在调度器出栈时转发实现的,如果使用 yield from ,直接 return 就可以了,也就是说 yield from 会自动调用 Generator 的 getReturn() 方法作为返回值,这一点我认为 PHP 文档并没有写清楚。

代码片段23:修改测试代码,演示 yield from

function reply($socket): Generator
{
    $data = yield from IO_Read($socket);
    if ($data) {
        yield from IO_Write($socket);
    }
}

function IO_Read($socket): Generator
{
    yield new SysCall(TaskAPI::WaitSocketRead, $socket);
    $data = stream_socket_recvfrom($socket, 1024);
    if (!$data) {
        echo "error: failed to read from socket!\n";
        stream_socket_shutdown($socket, STREAM_SHUT_RDWR);
    }

    return $data;
}

function IO_Write($socket): Generator
{
    yield new SysCall(TaskAPI::WaitSocketWrite, $socket);
    $len = stream_socket_sendto($socket, demoHttpResp());
    if (!$len) {
        echo "error: failed to write to socket!\n";
        stream_socket_shutdown($socket, STREAM_SHUT_RDWR);
    }

    return $len;
}

如果对这个版本的代码进行压力测试,会发现性能要比自行实现的协程栈要高不少,毕竟是直接在语言层面提供的方案,自然效率高很多。后续会抽空写一些 PHP 源码相关的文章,到时再来讨论语言层面的实现方案。当然了,如果使用了 yield from ,诸如 TaskStackReturn 类、Task 类的栈式结构和调度器中的相关类型判断与处理都是可以删除的,因为用不到自己实现的协程栈了嘛,这里就不再废话了。

其他 PHP 协程案例

如果搜索 PHP 协程相关文章,排除掉以扩展形式提供的方案,资源极其有限。鸟哥博客有一篇翻译的文章,我觉得写的非常好:在PHP中使用协程实现多任务调度。这里简要分析一下文中的思路,作为双向补充。

鸟哥的文章脉络分析

文章首先也是简要介绍 yield 语法,然后介绍调度器原理,并通过队列实现多个协程的调度。思路基本一致。到双向通信部分,调度器给协程传值,同样也是为其设置保存一个值,然后再次被调度执行时调用对应的 Generator 的 send($val) 方法,将值传递给协程;而协程给调度器传值,也是通过协程切换时,协程通过 yield 反馈的值的类类型来判断。其中对“系统调用”的实现稍有不同,做法是标识系统调用的特定类型,实际保存的是一个 callback ,调度器将自身和协程作为参数传递进去,这样做协程的动作可以很灵活,毕竟是直接操作调度器,与此同时其权限会比较大。网络调用部分,同样是调度器放一个额外的网络事件检查协程,一旦出现网络事件,对应的协程再度送回队列,思路基本一致。但是个人认为具体实现上有 bug ,下一小节来讨论这个 bug 。再后面也是介绍自行实现的协程栈,但是我认为实现的过于复杂了,不便与理解。这里截取代码贴出来讨论:

代码片段24:节选鸟哥文章 - 自实现透明协程栈

function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    for (;;) {
        $value = $gen->current();
        if ($value instanceof Generator) {
            $stack->push($gen);
            $gen = $value;
            continue;
        }
        $isReturnValue = $value instanceof CoroutineReturnValue;
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }
            $gen = $stack->pop();
            $gen->send($isReturnValue ? $value->getValue() : NULL);
            continue;
        }
        $gen->send(yield $gen->key() => $value);
    }
}

注意这个函数里面也是有 yield 语句的,所以调用它也会生成一个 Generator 。这个方法的目的在于,用这个方法包装的 Generator 哪怕是用 yield 嵌套组合了其他 Generator ,也能自动维护协程栈,同时对调度器来说这都是透明的,调度器的代码不需要任何修改。怎么做到的呢?

代码首先生成了一个栈对象,然后获取当前 Generator 的 yield 反馈值。如果是 Generator 说明啥,说明当前 Generator 在嵌套, yield 后面跟的是另外一个 Generator 函数,于是当前 Generator 入栈,被嵌套的 Generator 更新为操作对象。 continue 后继续判断当前操作对象 yield 反馈值,如果还是 Generator 那就继续入栈、更新当前操作对象、然后 continue ,这没啥好说的;如果发现当前操作对象结束了,或者其 yield 后面跟的是特定返回值类型,那说明可以执行出栈操作,把操作对象更新为弹出来的 Generatro ,然后把结果(如果只是结束没有反馈特定结果类型那就是 null )通过 send($val) 的方式,传递给新的操作对象并催动它尝试往前执行到下一个 yield 。

如果没有出现前述两种情况 —— 嵌套组合 Generator 、执行完毕或者反馈了特定结果类,那说明当前操作对象命不该绝啊,继续运行好了。那怎么理解让其继续运行的语句 $gen->send(yield $gen->key() => $value); ?要知道 Generator 是被调度器调度的,哪怕你是嵌套组合的 Generator ,被嵌套的 Generator 的 yield 反馈值也得给到调度器啊,虽然层次关系对调度器是透明的。例如被嵌套的 Generator 通过 yield 反馈了一个“系统调用”的类类型值,你这个负责维护栈的,自然得如实反馈给调度器,所以 yield $gen->key() => $value ,这个是给到调度器的。调度器后续继续运行你,可能会给你传值,而你拿到传值,就应该把这个值给到当前操作的 Generator ,而这个传值就是 yield 语句的结果,所以你执行的是 $gen->send(yield $gen->key() => $value); 。只有这样,你才是真正合格的,透明的,负责维护栈的,额, Generator 。

鸟哥文章示例代码的问题

我把文章的代码整体复制下来运行,会发现执行一个 curl 操作测试一下之后,程序就挂了。

PHP Fatal error:  Uncaught ValueError: No stream arrays were passed in Scheduler.php:102
Stack trace:
#0 Scheduler.php(102): stream_select()
#1 Scheduler->ioPoll()
#2 [internal function]: Scheduler->ioPollTask()
#3 RetvalHelpers.php(65): Generator->send()
#4 [internal function]: stackedCoroutine()
#5 Task.php(46): Generator->send()
#6 Scheduler.php(35): Task->run()
#7 test.php(40): Scheduler->run()
#8 {main}
  thrown in Scheduler.php on line 102

后面调试了一下,发现有一个逻辑问题。咱们先看调度器的调度部分和网络事件检查的协程怎么写的,这两个是最关键的部分。

代码片段25:节选鸟哥文章 - 负责检查网络事件的协程

class Scheduler
{

    // 省略
    // ...

    public function run()
    {
        $this->newTask($this->ioPollTask());
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $retval = $task->run();
            if ($retval instanceof SystemCall) {
                try {
                    $retval($task, $this);
                } catch (Exception $e) {
                    $task->setException($e);
                    $this->schedule($task);
                }
                continue;
            }
            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }

    // 省略
    // ...

    protected function ioPoll($timeout)
    {
        $rSocks = [];
        foreach ($this->waitingForRead as list($socket)) {
            $rSocks[] = $socket;
        }
        $wSocks = [];
        foreach ($this->waitingForWrite as list($socket)) {
            $wSocks[] = $socket;
        }
        $eSocks = []; // dummy
        if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
            return;
        }
        foreach ($rSocks as $socket) {
            list(, $tasks) = $this->waitingForRead[(int)$socket];
            unset($this->waitingForRead[(int)$socket]);
            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }
        foreach ($wSocks as $socket) {
            list(, $tasks) = $this->waitingForWrite[(int)$socket];
            unset($this->waitingForWrite[(int)$socket]);
            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }
    }

    protected function ioPollTask()
    {
        while (true) {
            if ($this->taskQueue->isEmpty()) {
                $this->ioPoll(null);
            } else {
                $this->ioPoll(0);
            }
            yield;
        }
    }
}

其实问题还是比较明显的,首先,负责调度的 run() 方法,在协程未结束时,会将其再度入列,而 ioPollTask() 是个永远不结束的协程,所以其运行完成一定会再度入列。那按照代码逻辑,整体就得保证网络事件检查列表不为空,否则必定报以上错误。然而这个保证是无法得到的,因为不一定用户协程在什么时候提出“网络调用”请求。如果分析该文章给出的测试代码,会发现一定会出现这种问题。

修正鸟哥文章示例代码

那怎么解决呢?很简单,如果当前没有待监听事件,那就让出执行权就好,可以这么修正 ioPollTask() 即可,修正之后运行就符合预期了。

代码片段26:修正鸟哥文章示例代码(负责检查网络事件的协程)

class Scheduler
{
    // 其他不变
    // ...

    protected function ioPollTask()
    {
        while (true) {

            if (!$this->waitingForRead && !$this->waitingForWrite) {
                yield;    // 此时应当主动让出执行权
            }

            if ($this->taskQueue->isEmpty()) {
                $this->ioPoll(null);
            } else {
                $this->ioPoll(0);
            }
            yield;
        }
    }
}

协程的优点和缺陷

这个问题老生常谈,很多地方都有讨论,这里仅仅写一写我自己的粗浅看法,谨供参考。

优点‧处理网络并发

协程处理网络并发之所以合适,就是因为即使并发连接数上来,也不会产生大量进程、线程来让操作系统调度。但是我们心里要清楚,协程的调度也是需要耗费资源的,毕竟调度代码就在哪里跑,协程的上下文也需要地方保存,运行的时候也有恢复动作。所以协程库的设计是需要尽量让其跑得快,尽量让 CPU 全力去执行具体的任务。

优点‧不需要锁

多线程编程中,很容易出现状态竞争( Race Condition ),而协程本质上就是在一个进程、一个线程中跑,不涉及状态竞争问题。注意这里指的是通过 yield 实现的协程。基于此,自然就不需要考虑锁的问题,例如本文和鸟哥文章中所用到的队列、哈希表,都没有做锁保护。

缺点‧“传播污染”

这样实现的协程,无法整合、利用现有的同步代码。例如,如果在某个协程中执行了此函数调用: file_get_contents("https://www.quarkay.com") ,这种属于网络操作,那当运行到此协程的这一步时,整个线程都会停下来等你。也就是说,连这个内建函数都没法用了,就更别说其他耗时的同步代码了。怎么办?只能改,把它的功能也给改成协程版本的,用的时候嵌套组合其协程版本。可是我们已经知道,“网络调用”指令只能由协程库提供,每个库有可能实现的不一样,例如本文和鸟哥的文章,就通过不同的 yield 反馈类类型来告知调度器。这个就叫传播污染,你用了我的库,你所有的协程组件都得按照我的规则来,原有的同步代码都得对应改写,也许可以叫“生态”,但我觉得叫“传播污染”也很合适。

缺点‧多核的利用

考虑如下情景,网络并发很高的同时,具体的处理动作是 CPU 密集型的,这种情况怎么办?那当然是把 CPU 用到 100% ,能处理多少就处理多少。可我们已经知道,这种方式实现的协程是一个进程一个线程,最多跑满一个 CPU 核,这还怎么玩?无法完全发挥 CPU 的性能。所以对多核心的利用上,还得想想其他办法。

弥补 yield 的缺陷

软件开发相关知识,总是在不断地更新和进步, yield 语法至少有十几年历史(具体首次提出日期没查到), PHP 则是 2013 年就加入该语法。有一些方案,或称替代、或称弥补,值得在这里提一下。

新潮的 Fiber

2021 年底, PHP 发布 8.1.0 版本,开始支持 Fiber ,在官方中文文档中翻译为“纤程”。对 PHP 开发者来说, Fiber 完全可以作为 yield / yield from 的替代品,起到控制协程启停(自动保存现场)、双向通信的作用,只不过使用方式稍有变化,从 Fiber 类的摘要也能看出来,两者非常相似:

代码片段27:PHP Fiber 类摘要

final class Fiber {
    /* Methods */
    public __construct(callable $callback)
    public start(mixed ...$args): mixed
    public resume(mixed $value = null): mixed
    public throw(Throwable $exception): mixed
    public getReturn(): mixed
    public isStarted(): bool
    public isSuspended(): bool
    public isRunning(): bool
    public isTerminated(): bool
    public static suspend(mixed $value = null): mixed
    public static getCurrent(): ?Fiber
}

在我看来,相对于 Generator , Fiber 只是提供了性能更高的替代方案,因为保存与切换的是 C 执行流上下文,而不是 Zend Opcode 执行流上下文。“传播污染”和“无法利用多核”的缺陷依然存在。

Promise A+ 规范

这个规范只是用来组织逻辑关系的一种方案,从本质上讲与所谓异步、协程没有关系。 ReactPHP 这个仓库有对应的 PHP 实现,代码读起来有点绕需要耐心。个人认为是从 Javascript 而来的舶来品,使用体验上也许比较丝滑,但如果不是语言层面直接提供的,遇到问题跟踪调试会很难受。提出年代也比较久远了,值得学习和了解,但也仅限于此。对我个人而言,并不觉得套用规范比直接用 Generator / Fiber 方便(如果不考虑其生态配合)。

async / await 语法糖

如今 Python 和 Javascript 都有提供 async / await 语法糖。每一个 async / await 语法,都可以转写为对应的 Generator / Fiber 组合。同样的, ReactPHP 这个仓库也有对应的 PHP 实现,其基于 Fiber 和其自实现的 Promise 实现,更绕了,代码调试更为痛苦。同时,“传播污染”和“无法利用多核”的缺陷依然存在,只是换了个名字和写法而已。

Go 语言的 goroutine

可以说以上所有方案,都没有很好的解决问题,不管是语言自身提供的,还是通过应用语言的代码库实现的。如果激进一点,负责调度的模块直接内置在语言内部,作为语言的一部分“特色”,同时利用上多核,可不可以?当然可以。 Go 语言就是这么处理的,即所谓 goroutine 。我认为,可以把 goroutine 理解为可以并行运行的有栈协程。

入口主函数就可以认为是一个 goroutine ,如果遇到网络耗时任务完全可以放到另外的 goroutine 去处理,多个 goroutine 可以在多个 cpu 核心并行运行,多核自然利用起来了。同时各 goroutine 如果调用耗时操作,自动被切换到等待网络事件的 goroutine 队列,暂停执行。这样一来,“传播污染”和“无法利用多核”的缺陷被彻底解决。但同时,带来了与线程同步类似的问题, Go 中通过所谓 channel 、mutex 解决,与消息队列和线程锁类似。

Swoole 扩展

个人认为 Swoole 参考了 Go 的语法,两者有部分相似。 Swoole 几乎是目前 PHP 生态中最完备的协程化解决方案。但 Go 中内置模块与 goroutine 兼容,因为一开始就设计好了,内置了通过事件来调度的模块。而 PHP 中内置模块均为同步代码,若有耗时网络操作,即使套在 Swoole 协程中使用,该协程一样会被阻塞。也因为如此, Swoole Hook 了很多 PHP 内置模块,这样上层应用代码就不需要通过 Swoole 提供的基础功能,重新自实现原本 PHP 内置的耗时操作,例如网络调用相关的封装。从这点来说,非常有诚意,这种做法与 Python 生态中 gevent 所提供的“猴子补丁”类似,都是为了可以尽量少地改动代码来配合协程环境的效果。但是目前来看, Swoole 完全不可能合并进 php-src 。

Fiber 的上下文切换实现与 Swoole 完全一样(都使用的 boost 库的部分代码),对此感兴趣的同学可以阅读另外一篇博文:Swoole 源码分析1 - 协程的上下文切换 。相对 Go 而言,目前 Swoole 和 Fiber 缺少的是对多核的利用、与外部生态统一的通过事件来调度的模块以及与 PHP 内置模块的完美配合,但这些将来应该会逐步推进。

总结

有时候思考复杂问题,想着想着会忘了原本的目标是什么。并行化协程方案的出现,本质上就是让 CPU 别分心,尽可能让算力都拿来运行业务逻辑从而服务于人,而不是碌碌无为。最后,这个时期编写 PHP 应用,具体决定采用哪种方案,似乎有那么点尴尬哈哈。