Actor
提供Actor
模式支持,助力游戲行業(yè)開發(fā)。EasySwoole
的Actor
采用自定義Process
作為存儲載體,以協(xié)程作為最小調(diào)度單位,利用協(xié)程Channel
做mail box
,而客戶端與Process
之間的通訊,采用UnixSocket
實現(xiàn),并且借助TCP
實現(xiàn)分布式的ActorClient
,超高并發(fā)下也能輕松應(yīng)對。
工作流程
一般來說有兩種策略用來在并發(fā)線程中進(jìn)行通信:共享數(shù)據(jù)和消息傳遞。使用共享數(shù)據(jù)方式的并發(fā)編程面臨的最大的一個問題就是數(shù)據(jù)條件競爭,當(dāng)兩個實例需要訪問同一個數(shù)據(jù)時,為了保證數(shù)據(jù)的一致性,通常需要為數(shù)據(jù)加鎖,而Actor模型采用消息傳遞機制來避免數(shù)據(jù)競爭,無需復(fù)雜的加鎖操作,各個實例只需要關(guān)注自身的狀態(tài)以及處理收到的消息。
Actor
是完全面向?qū)ο蟆o鎖、異步、實例隔離、分布式的并發(fā)開發(fā)模式。Actor
實例之間互相隔離,Actor
實例擁有自己獨立的狀態(tài),各個Actor
之間不能直接訪問對方的狀態(tài),需要通過消息投遞機制來通知對方改變狀態(tài)。由于每個實例的狀態(tài)是獨立的,沒有數(shù)據(jù)被共享,所以不會發(fā)生數(shù)據(jù)競爭,從而避免了并發(fā)下的加鎖問題。
舉一個游戲場景的例子,在一個游戲房間中,有5個玩家,每個玩家都是一個PlayerActor
,擁有自己的屬性,比如角色I(xiàn)D,昵稱,當(dāng)前血量,攻擊力等。游戲房間本身也是一個RoomActor
,房間也擁有屬性,比如當(dāng)前在線的玩家,當(dāng)前場景的怪物數(shù)量,怪物血量等。此時玩家A攻擊某個怪物,則PlayerActor-A
向RoomActor
發(fā)送一個攻擊怪物的指令,RoomActor
經(jīng)過計算,得出玩家A對怪物的傷害值,并給房間內(nèi)的所有PlayerActor
發(fā)送一個消息(玩家A攻擊怪物A,造成175點傷害,怪物A剩余血量1200點),類似此過程,每個PlayerActor
都可以得知房間內(nèi)發(fā)生了什么事情,但又不會造成同時訪問怪物A的屬性,導(dǎo)致的共享加鎖問題。
安裝
Actor
并沒有作為內(nèi)置組件,需要先引入包并進(jìn)行基礎(chǔ)配置才能夠使用。
composer require easyswoole/actor
使用
建立一個Actor
每一種對象(玩家、房間、甚至是日志服務(wù)也可以作為一種Actor
對象)都建立一個Actor
來進(jìn)行管理,一個對象可以擁有多個實例(Client)
并且可以互相通過信箱發(fā)送消息來處理業(yè)務(wù)。
<?php
namespace App\Player;
use EasySwoole\Actor\AbstractActor;
use EasySwoole\Actor\ActorConfig;
/**
* 玩家Actor
* Class PlayerActor
* @package App\Player
*/
class PlayerActor extends AbstractActor
{
/**
* 配置當(dāng)前的Actor
* @param ActorConfig $actorConfig
*/
public static function configure(ActorConfig $actorConfig)
{
$actorConfig->setActorName('PlayerActor');
$actorConfig->setWorkerNum(3);
}
/**
* Actor首次啟動時
*/
protected function onStart()
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onStart\n";
}
/**
* Actor收到消息時
* @param $msg
*/
protected function onMessage($msg)
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onMessage\n";
}
/**
* Actor即將退出前
* @param $arg
*/
protected function onExit($arg)
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onExit\n";
}
/**
* Actor發(fā)生異常時
* @param \Throwable $throwable
*/
protected function onException(\Throwable $throwable)
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onException\n";
}
}
注冊Actor服務(wù)
可以使用setListenAddress
和setListenPort
指定本機對外監(jiān)聽的端口,其他機器可以通過該端口向本機的Actor
發(fā)送消息。
public static function mainServerCreate(EventRegister $register) {
// 注冊Actor管理器
$server = \EasySwoole\EasySwoole\ServerManager::getInstance()->getSwooleServer();
\EasySwoole\Actor\Actor::getInstance()->register(PlayerActor::class);
\EasySwoole\Actor\Actor::getInstance()->setTempDir(EASYSWOOLE_TEMP_DIR)
->setListenAddress('0.0.0.0')->setListenPort('9900')->attachServer($server);
}
Actor實例管理
服務(wù)啟動后就可以進(jìn)行Actor
的操作,管理本機的Client
實例,則不需要給client
傳入$node
參數(shù),默認(rèn)的node
為本機,管理其他機器時需要傳入。
// 管理本機的Actor則不需要聲明節(jié)點
$node = new \EasySwoole\Actor\ActorNode();
$node->setIp('127.0.0.1');
$node->setListenPort(9900);
// 啟動一個Actor并得到ActorId 后續(xù)操作需要依賴ActorId
$actorId = PlayerActor::client($node)->create(['time' => time()]); // 00101000000000000000001
// 給某個Actor發(fā)消息
PlayerActor::client($node)->send($actorId, ['data' => 'data']);
// 給該類型的全部Actor發(fā)消息
PlayerActor::client($node)->sendAll(['data' => 'data']);
// 退出某個Actor
PlayerActor::client($node)->exit($actorId, ['arg' => 'arg']);
// 退出全部Actor
PlayerActor::client($node)->exitAll(['arg' => 'arg']);
架構(gòu)解讀
Actor
應(yīng)該叫ActorManager
更確切點,它用來注冊Actor
啟動Proxy
和ActorWorker
進(jìn)程。
當(dāng)你在業(yè)務(wù)邏輯里定義了幾種Actor
,比如RoomActor
、PlayerActor
,需要在SwooleServer
啟動時注冊它們。
具體就是在EasySwooleEvent.mainServerCreate
方法中添加如下代碼。
$actor = Actor::getInstance();
$actor->register(RoomActor::class);
$actor->register(PlayerActor::class);
$actorConf = Config::getInstance()->getConf('ACTOR_SERVER');
$actor->setMachineId($actorConf['MACHINE_ID'])
->setListenAddress($actorConf['LISTEN_ADDRESS'])
->setListenPort($actorConf['PORT'])
->attachServer($server);
其中ListenAddress
、ListenPort
為Proxy
進(jìn)程的監(jiān)聽地址端口,MachineId
為ActorWorker
進(jìn)程的機器碼。
MachineId
和IP:PORT
對應(yīng)。
attachServer
將開啟相應(yīng)數(shù)量的Proxy
進(jìn)程,以及前邊register
的ActorWorker
進(jìn)程。
工作原理
Proxy
進(jìn)程做消息中轉(zhuǎn),Worker
進(jìn)程做消息分發(fā)推送。來看個具體的例子:
游戲中玩家P請求進(jìn)入房間R,抽象成Actor
模型就是PlayerActor
需要往RoomActor
發(fā)送請求加入的命令。
那么這時候需要這樣寫:
\EasySwoole\Actor\Test\RoomActor::client($node)->send($roomActorId, [
'user_actor_id' => $userActorId,
'data' => '其他進(jìn)入房間的參數(shù)'
])
其中$roomActorId
和$userActorId
是事先xxActor::client()->create()
出來的。
上面那段代碼的意思就是往$roomActorId
的RoomActor
實例推送了一條$userActorId
玩家的UserActor
實例要加入房間的消息。
參數(shù)$node
用來尋址Proxy
,它由目標(biāo)Actor
實例的Worker.MachineId
決定,在本例中就是$roomActorId
被創(chuàng)建在了哪個MachineId
的WorkerProcess
。
通過$roomActorId
中的機器碼找到IP:PORT
,生成$node
。
send
時會創(chuàng)建一個協(xié)程TcpClient
,將消息發(fā)送給Proxy
,然后Proxy
將消息轉(zhuǎn)發(fā)(UnixClient)
至本機WorkerProcess
,WorkerProcess
收到消息,推送到具體的Actor
實例。
這樣就完成了從PlayerActor
到RoomActor
的請求通訊,RoomActor
收到請求消息并處理完成后,向PlayerActor
回發(fā)處理結(jié)果,用的是同樣的通訊流程。
如果是單機部署,可以忽略$node
參數(shù),因為所有通訊都是在本機進(jìn)行。
多機的話,需要自己根據(jù)業(yè)務(wù)來實現(xiàn)Actor
如何分布和定位。
主要屬性
machineId 機器碼
proxyNum 啟動幾個ProxyProcess
listenPort 監(jiān)聽port
listenAddress 監(jiān)聽ip
AbstractActor
Actor
實例的基類,所有業(yè)務(wù)中用到的Actor
都將繼承于`AbstractActor。例如游戲場景中的房間,你可以:
class RoomActor extends AbstractActor
工作原理
每個Actor
實例都維護一份獨立的數(shù)據(jù)和狀態(tài),當(dāng)一個Actor
實例通過client()->create()
后,會開啟協(xié)程循環(huán),接收mailbox pop
的消息,進(jìn)而處理業(yè)務(wù)邏輯,更新自己的數(shù)據(jù)及狀態(tài)。具體實現(xiàn)就是__run()
這個方法。
靜態(tài)方法 configure
用來配置ActorConfig
,只需要在具體的Actor
(如RoomActor
)去重寫這個方法就行。
關(guān)于ActorConfig
具體屬性可以看下邊ActorConfig
部分。
幾個虛擬方法
以下幾個虛擬方法需要在Actor
子類中實現(xiàn),這幾個方法被用在__run()
中來完成Actor
的運行周期。
onStart() 在協(xié)程開啟前執(zhí)行,你可以在此進(jìn)行Actor
初始化的一些操作,比如獲取房間的基礎(chǔ)屬性等。
onMessage() 當(dāng)接收到消息時執(zhí)行,一個Actor
實例的生命周期基本上就是在收消息-處理-發(fā)消息,你需要在這里對消息進(jìn)行解析處理。
onExit() 當(dāng)接收到退出命令時執(zhí)行。比如你希望在一個Actor
實例退出的時候,同時通知某些關(guān)聯(lián)的其他Actor
,可以在此處理。
其它
exit() 用于實例自己退出操作,會向自己發(fā)一條退出的命令。
tick()、after() 兩個定時器,用于Actor
實例的定時任務(wù),比如游戲房間的定時刷怪(tick)
;掉線后多長時間自動踢出(after)
。
static client() 用于創(chuàng)建一個ActorClient
來進(jìn)行對應(yīng)Actor
(實例)的通訊。
ActorClient
Actor
通訊客戶端,調(diào)用xxActor::client()
來創(chuàng)建一個ActorClient
進(jìn)行Actor
通訊。
上邊已經(jīng)大概講過了Actor
的通訊流程,本質(zhì)就是TcpClient->ProxyProcess->UnixClient->ActorWorkerProcess->xxActor
。
看下它實現(xiàn)了哪些方法:
create() 創(chuàng)建一個xxActor
實例,返回actorId
,在之后你可以使用這個actorId
與此實例進(jìn)行通訊。
send() 指定actorId
,向其發(fā)送消息。
exit() 通知xxActor
退出指定actorId
的實例。
sendAll() 向所有的xxActor
實例發(fā)送消息。
exitAll() 退出所有xxActor
實例。
exist() 當(dāng)前是否存在指定actorId
的xxActor
實例。
status() 當(dāng)前ActorWorker
下xxActor
的分布狀態(tài)。
ActorConfig
具體Actor
的配置項,比如RoomActor
、PlayerActor
都有自己的配置。
actorName 一般用類名就可以,注意在同一個服務(wù)中這個是不能重復(fù)的。
actorClass 在Actor->register()
會將對應(yīng)的類名寫入。
workerNum 為Actor
開啟幾個進(jìn)程,Actor->attachServer()
時會根據(jù)這個參數(shù)為相應(yīng)Actor
啟動WorkerNum
個Worker
進(jìn)程。
ActorNode
上邊提到過,xxActor::client($node)
,這個$node
就是ActorNode
對象,屬性為Ip
和Port
,用于尋址Proxy
。
WorkerConfig
WorkerProcess
的配置項,WorkerProcess
啟動時用到。
workerId worker
進(jìn)程Id
,create Actor
的時候用于生成actorId
machineId worker
進(jìn)程機器碼,create Actor
的時候用于生成actorId
trigger 異常觸發(fā)處理接口
WorkerProcess
Actor
的重點在這里,每個注冊的Actor
(類)會啟動相應(yīng)數(shù)量的WorkerProcess
。
比如你注冊了RoomActor
、PlayerActor
,workerNum
都配置的是3,那么系統(tǒng)將啟動3個RoomActor
的Worker
進(jìn)程和3個PlayerActor
的Worker
進(jìn)程。
每個WorkerProcess
維護一個ActorList
,你通過client()->create()
的Actor
將分布在不同Worker
進(jìn)程里,由它的ActorList
進(jìn)行管理。
WorkerProcess
通過協(xié)程接收client
(這個client
就是Proxy
做轉(zhuǎn)發(fā)時的UnixClient
)消息,區(qū)分消息類型,然后分發(fā)給對應(yīng)的Actor
實例。
請仔細(xì)閱讀下WorkerProcess
的源碼,它繼承于AbstractUnixProcess
。
UnixClient
UnixStream Socket
,自行了解。Proxy
轉(zhuǎn)發(fā)消息給本機Actor
所使用的Client
。
Protocol
數(shù)據(jù)封包協(xié)議。
ProxyCommand
消息命令對象,Actor2
將不同類型的消息封裝成格式化的命令,最終傳給WorkerProcess
。
你可以在ActorClient
中了解一下方法和命令的對應(yīng)關(guān)系,但這個不需要在業(yè)務(wù)層去更改。
ProxyConfig
消息代理的配置項。
actorList 注冊的actor
列表。
machineId 機器碼
tempDir 臨時目錄
trigger 錯誤觸發(fā)處理接口
ProxyProcess
Actor->attachServer()
會啟動proxyNum
個ProxyProcess
。
用于在Actor
實例和WorkerProcess
做消息中轉(zhuǎn)。