第三方存储媒介

前面我们介绍了基于 Swoole 的 Process 及 Process\Pool 模块在 PHP 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 Swoole 实现的多进程间共享数据,需要借助第三方存储媒介实现:

数据库:MySQL、MongoDB

缓存:Redis、Memcached

磁盘文件

但是这也会引入新的问题,多进程同时操作一条记录或一个文件存在并发访问问题,以数据库操作为例,两个进程可能会同时读取一条数据,或者一个进程对某条记录进行更新处理时,另一个进程也来读取这条记录并进行操作,会导致最终结果数据与预期不一致的情况,这个时候,我们就需要引入锁的概念,当一个进程(比如进程A)对某个记录进行写操作时,对该记录加锁,这样其它进程就无法操作该条记录, 直到进程 A 事务提交再释放这个锁,让其他进程可以进行操作。

内存共享

PHP 相关扩展

对于单机操作来说,除了这些第三方存储媒介之外,还可以通过共享内存的方式实现进程间数据读写操作,有多个 PHP 扩展可以支持共享内存数据操作:

Semaphore 扩展:可通过该扩展包提供的 shm_get_var 和 shm_put_var 函数实现内存共享数据的读写操作;

Shmop 扩展:可通过该扩展包提供的 shmop_read 和 shmop_write 函数实现内存共享数据的读写操作;

APCu(APC User Cache)扩展:可通过该扩展包提供的 apc_fetch 和 apc_store 实现内存共享数据的读写操作。

Swoole Table

但是上述扩展要么不支持锁,要么高并发时性能比较差,所以 Swoole 自己实现了一个共享内存读写工具 —— Swoole\Table,该工具是一个基于共享内存和锁实现的高性能并发数据结构,可用于解决多进程/多线程数据共享和同步加锁问题:

性能强悍,单线程每秒可读写200万次;

应用代码无需加锁,内置行锁自旋锁,所有操作均是多线程/多进程安全,用户层完全不需要考虑数据同步问题;

支持多进程,可用于多进程之间共享数据;

使用行锁,而不是全局锁,仅当 2 个进程在同一 CPU 时间,并发读取同一条数据才会进行发生抢锁。

Swoole\Table 支持以 Key-Value 方式读写,使用起来非常简单:

// 初始化一个容量为 1024 的 Swoole Table

$table = new \Swoole\Table(1024);

// 在 Table 中新增 id 列

$table->column('id', \Swoole\Table::TYPE_INT);

// 在 Table 中新增 name 列,长度为 50

$table->column('name', \Swoole\Table::TYPE_STRING, 10);

// 在 Table 中新泽 score 列

$table->column('score', \Swoole\Table::TYPE_FLOAT);

// 创建这个 Swoole Table

$table->create();

// 设置 Key-Value 值

$table->set('student-1', ['id' => 1, 'name' => '学小君', 'score' => 80]);

$table->set('student-2', ['id' => 2, 'name' => '学院君', 'score' => 90]);

// 如果指定 Key 值存在则打印对应 Value 值

if ($table->exist('student-1')) {
echo "Student-" . $table->get('student-1', 'id') . ':' . $table->get('student-1', 'name').":".

$table->get('student-1', 'score') . "\n";

}

// 自增操作

$table->incr('student-2', 'score', 5);

// 自减操作

$table->decr('student-2', 'score', 5);

// 表中总记录数

$count = $table->count();

// 删除指定表记录

$table->del('student-1');

此外 Swoole\Table 类还实现了迭代器接口,支持通过 foreach 进行遍历。

在 Laravel 中使用 Swoole\Table

如果要在 Laravel 中集成 Swoole 使用 Swoole\Table,以 LaravelS 扩展包为例,首先要在配置文件 config/laravels.php 中定义 swoole_tables 配置项:

'swoole_tables' => [

'ws' => [ // 表名,会加上 Table 后缀,比如这里是 wsTable

'size' => 102400, // 表容量

'column' => [ // 表字段,字段名为 value

['name' => 'value', 'type' => \Swoole\Table::TYPE_INT, 'size' => 8],

],

],

... // 还可以定义其它表

],

然后我们可以在代码中通过swoole实例上的wsTable属性访问 SwooleTable:

class WebSocketService implements WebSocketHandlerInterface

{
...

// 连接建立时触发

public function onOpen(Server $server, Request $request)

{
// 在触发 WebSocket 连接建立事件之前,Laravel 应用初始化的生命周期已经结束,你可以在这里获取 Laravel 请求和会话数据

// 调用 push 方法向客户端推送数据,fd 是客户端连接标识字段

Log::info('WebSocket 连接建立:' . $request->fd);

app('swoole')->wsTable->set('fd:' . $request->fd, ['value' => $request->fd]);

$server->push($request->fd, 'Welcome to WebSocket Server built on LaravelS');

}

// 收到消息时触发

public function onMessage(Server $server, Frame $frame)

{
foreach (app('swoole')->wsTable as $key => $row) {
if (strpos($key, 'fd:') === 0 && $server->exist($row['value'])) {
Log::info('Receive message from client: ' . $row['value']);

// 调用 push 方法向客户端推送数据

$server->push($frame->fd, 'This is a message sent from WebSocket Server at ' . date('Y-m-d H:i:s'));

}

}

}

...

}

然后我们参考在 Laravel 中集成 Swoole 实现 WebSocket 服务器这篇教程从客户端向 WebSocket 服务器发起请求,即可在最新日志文件中看到相应的日志信息:

[2020-04-24 19:39:03] local.INFO: WebSocket 连接建立:1

[2020-04-24 19:39:07] local.INFO: Receive message from client: 1