easyswoole - 基于swoole扩展实现的一款高性能php框架

EasySwoole RPC 自定義注冊中心

EasySwoole 默認為通過 UDP 廣播 + 自定義進程定時刷新自身節(jié)點信息的方式來實現(xiàn)無主化/注冊中心的服務(wù)發(fā)現(xiàn)。在服務(wù)正常關(guān)閉的時候,自定義定時進程的onShutdown 方法會執(zhí)行 deleteServiceNode 方法來實現(xiàn)節(jié)點下線。在非正常關(guān)閉的時候,心跳超時也會被節(jié)點管理器踢出。

有些情況,比如服務(wù)都不在一個網(wǎng)段上,由于udp協(xié)議的設(shè)置,將會廣播不到,只能點對點的進行廣播數(shù)據(jù),就不是很方便。那么 EasySwoole 支持你自定義一個節(jié)點管理器,來變更服務(wù)注冊及發(fā)現(xiàn)方式。

下面實現(xiàn)的 Redis 節(jié)點管理器示例是基于 easyswoole/redis-pool 組件 實現(xiàn),所以請先執(zhí)行 composer require easyswoole/redis-pool 安裝 redis-pool 組件。關(guān)于 easyswoole/redis-pool 組件具體用戶請查看 easyswoole/redis-pool 章節(jié)

例如使用 Redis 來實現(xiàn)

<?php

namespace App\RpcServices\NodeManager;

use EasySwoole\Redis\Redis;
use EasySwoole\RedisPool\Pool;
use EasySwoole\RedisPool\RedisPool;
use EasySwoole\Rpc\NodeManager\NodeManagerInterface;
use EasySwoole\Rpc\Server\ServiceNode;

class RedisManager implements NodeManagerInterface
{
    protected $redisKey;

    protected $ttl;

    /**
     * @var Pool $pool
     */
    protected $pool;

    public function __construct(Pool $pool, string $hashKey = 'rpc', int $ttl = 30)
    {
        $this->pool = $pool;
        $this->redisKey = $hashKey;
        $this->ttl = $ttl;
    }

    function getNodes(string $serviceName, ?int $version = null): array
    {
        $fails = [];
        $hits = [];
        $time = time();

        $redisPool = $this->pool;

        /** @var Redis $redis */
        $redis = $redisPool->defer(15);

        try {
            $nodes = $redis->hGetAll("{$this->redisKey}_{$serviceName}");

            $nodes = $nodes ?: [];

            foreach ($nodes as $nodeId => $value) {
                $node = json_decode($value, true);
                if ($time - $node['lastHeartbeat'] > $this->ttl) {
                    $fails[] = $nodeId;
                    continue;
                }
                if ($node['service'] === $serviceName) {
                    if ($version !== null && $version === $node['version']) {
                        $serviceNode = new ServiceNode($node);
                        $serviceNode->setNodeId(strval($nodeId));
                        $hits[$nodeId] = $serviceNode;
                    } else {
                        $serviceNode = new ServiceNode($node);
                        $serviceNode->setNodeId(strval($nodeId));
                        $hits[] = $serviceNode;
                    }
                }
            }
            if (!empty($fails)) {
                foreach ($fails as $failKey) {
                    $this->deleteServiceNode($serviceName, $failKey);
                }
            }
            return $hits;
        } catch (\Throwable $throwable) {
            // 如果該 redis 斷線則銷毀
            $redisPool->unsetObj($redis);
        } finally {
            $redisPool->recycleObj($redis);
        }

        return [];
    }

    function getNode(string $serviceName, ?int $version = null): ?ServiceNode
    {
        $list = $this->getNodes($serviceName, $version);
        if (empty($list)) {
            return null;
        }
        $allWeight = 0;

        $redisPool = $this->pool;;

        /** @var Redis $redis */
        $redis = $redisPool->getObj(15);

        $time = time();

        try {
            foreach ($list as $node) {
                /** @var ServiceNode $nodee */
                $key = $node->getNodeId();
                $nodeConfig = $redis->hGet("{$this->redisKey}_{$serviceName}", $key);
                $nodeConfig = json_decode($nodeConfig, true);
                $lastFailTime = $nodeConfig['lastFailTime'];
                if ($time - $lastFailTime >= 10) {
                    $weight = 10;
                } else {
                    $weight = abs(10 - ($time - $lastFailTime));
                }
                $allWeight += $weight;
                $node->__weight = $weight;
            }
            mt_srand(intval(microtime(true)));
            $allWeight = rand(0, $allWeight - 1);
            foreach ($list as $node) {
                $allWeight = $allWeight - $node->__weight;
                if ($allWeight <= 0) {
                    return $node;
                }
            }
        } catch (\Throwable $throwable) {
            // 如果該 redis 斷線則銷毀
            $redisPool->unsetObj($redis);
        } finally {
            $redisPool->recycleObj($redis);
        }

        return null;
    }

    function failDown(ServiceNode $serviceNode): bool
    {

        $redisPool = $this->pool;;

        /** @var Redis $redis */
        $redis = $redisPool->getObj(15);
        try {
            $serviceName = $serviceNode->getService();
            $nodeId = $serviceNode->getNodeId();
            $hashKey = "{$this->redisKey}_{$serviceName}";
            $nodeConfig = $redis->hGet($hashKey, $nodeId);
            $nodeConfig = json_decode($nodeConfig, true);
            $nodeConfig['lastFailTime'] = time();
            $redis->hSet($hashKey, $nodeId, json_encode($nodeConfig));
            return true;
        } catch (\Throwable $throwable) {
            // 如果該 redis 斷線則銷毀
            $redisPool->unsetObj($redis);
        } finally {
            $redisPool->recycleObj($redis);
        }

        return false;
    }

    function offline(ServiceNode $serviceNode): bool
    {

        $redisPool = $this->pool;;

        /** @var Redis $redis */
        $redis = $redisPool->getObj(15);
        try {
            $serviceName = $serviceNode->getService();
            $nodeId = $serviceNode->getNodeId();
            $hashKey = "{$this->redisKey}_{$serviceName}";
            $redis->hDel($hashKey, $nodeId);
            return true;
        } catch (\Throwable $throwable) {
            // 如果該 redis 斷線則銷毀
            $redisPool->unsetObj($redis);
        } finally {
            $redisPool->recycleObj($redis);
        }

        return false;
    }

    function alive(ServiceNode $serviceNode): bool
    {
        $info = [
            'service' => $serviceNode->getService(),
            'ip' => $serviceNode->getIp(),
            'port' => $serviceNode->getPort(),
            'version' => $serviceNode->getVersion(),
            'lastHeartbeat' => time(),
            'lastFailTime' => 0
        ];

        $redisPool = $this->pool;;

        /** @var Redis $redis */
        $redis = $redisPool->getObj();

        try {
            $serviceName = $serviceNode->getService();
            $nodeId = $serviceNode->getNodeId();
            $hashKey = "{$this->redisKey}_{$serviceName}";
            $redis->hSet($hashKey, $nodeId, json_encode($info));
            return true;
        } catch (\Throwable $throwable) {
            // 如果該 redis 斷線則銷毀
            $redisPool->unsetObj($redis);
        } finally {
            $redisPool->recycleObj($redis);
        }

        return false;
    }

    private function deleteServiceNode($serviceName, $failKey): bool
    {
        $redisPool = $this->pool;;

        /** @var Redis $redis */
        $redis = $redisPool->getObj(15);
        try {
            $redis->hDel("{$this->redisKey}_{$serviceName}", $failKey);
            return true;
        } catch (\Throwable $throwable) {
            $redisPool->unsetObj($redis);
        } finally {
            $redisPool->recycleObj($redis);
        }

        return false;
    }
}
 /** @var \EasySwoole\Rpc\Config $config */
$assistConfig = $config->getAssist();

// 服務(wù)定時自刷新到節(jié)點管理器
$assistConfig->setAliveInterval(5000);

即使關(guān)閉了 UDP 定時廣播,EasySwoole RpcAssistWorker 進程依舊會每 5 秒執(zhí)行一次 serviceAlive 用于更新自身的節(jié)點心跳信息。

注冊

<?php

namespace EasySwoole\EasySwoole;

use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;use EasySwoole\Redis\Config\RedisConfig;use EasySwoole\RedisPool\Pool;use EasySwoole\RedisPool\RedisPool;

class EasySwooleEvent implements Event
{
    public static function initialize()
    {
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        ###### 注冊 rpc 服務(wù) ######
        /** rpc 服務(wù)端配置 */
        // 采用了redis 節(jié)點管理器 可以關(guān)閉udp 廣播了。
        $redisM = new RedisManager(new Pool(new RedisConfig(['host' => '127.0.0.1'])));
        $config = new \EasySwoole\Rpc\Config($redisM);
        $config->setNodeId('EasySwooleRpcNode1');
        $config->setServerName('EasySwoole'); // 默認 EasySwoole
        $config->setOnException(function (\Throwable $throwable) {

        });

        $serverConfig = $config->getServer();
        $serverConfig->setServerIp('127.0.0.1');

        // rpc 具體配置請看配置章節(jié)
        $rpc = new \EasySwoole\Rpc\Rpc($config);

        // 創(chuàng)建 Goods 服務(wù)
        $goodsService = new \App\RpcServices\Goods();
        // 添加 GoodsModule 模塊到 Goods 服務(wù)中
        $goodsService->addModule(new \App\RpcServices\GoodsModule());
        // 添加 Goods 服務(wù)到服務(wù)管理器中
        $rpc->serviceManager()->addService($goodsService);

        // 創(chuàng)建 Common 服務(wù)
        $commonService = new \App\RpcServices\Common();
        // 添加 CommonModule 模塊到 Common 服務(wù)中
        $commonService->addModule(new \App\RpcServices\CommonModule());
        // 添加 Common 服務(wù)到服務(wù)管理器中
        $rpc->serviceManager()->addService($commonService);

        // 此刻的rpc實例需要保存下來 或者采用單例模式繼承整個Rpc類進行注冊 或者使用Di

        // 注冊 rpc 服務(wù)
        $rpc->attachServer(ServerManager::getInstance()->getSwooleServer());

    }
}
主站蜘蛛池模板: 青州东威机械有限公司,洗沙机,脱水筛、细沙回收机,淘金设备,洗石机,砂石分离机,筛沙机,采沙船,清淤船,破碎制砂机,海沙淡化设备 | 山东健泽医疗科技有限公司-官网_深呼吸训练器_肋骨固定板_放疗定位膜耗材生产厂家招商 | 小程序定制,小程序开发,北京小程序公司,网站建设,网站制作,北京网站建设,北京网站制作 | 江寒必恋术在线阅读_江寒必恋术免费下载 - 江寒必恋术电子书 | 智能仓储货架厂家 - 汇峰仓储| 饲料车_散装饲料车_畜禽运输车_散装饲料运输车_饲料车厂家_铝合金运猪车-程力专用汽车股份有限公司 | 呼吸家官网|肺功能检测仪生产厂家|国产肺功能仪知名品牌|肺功能检测仪|肺功能测试仪|婴幼儿肺功能仪|弥散残气肺功能仪|肺功能测试系统|广州红象医疗科技有限公司|便携式肺功能仪|大肺功能仪|呼吸康复一体机|儿童肺功能仪|肺活量计|医用简易肺功能仪|呼吸康复系统|肺功能仪|弥散肺功能仪(大肺)|便携式肺功能检测仪|肺康复|呼吸肌力测定肺功能仪|肺功能测定仪|呼吸神经肌肉刺激仪|便携式肺功能 | 液化气密度测定仪,原油有机氯测定仪-泰州市姜堰分析仪器厂 | 潍坊亿宏重工机械有限公司,破碎机,高性能立磨机,颚式破碎机,锤式破碎机反击式破碎机,重锤式破碎机,高性能反击式破碎机,圆锥式破碎机,给料机系列,链板给料机系列,简易给料机系列,振动给料机 | 造型松|造型黑松|油松|泰山松-莱芜市盛世园林苗木专业合作社 | 启东华立石油化工机械设备有限公司|过滤器|混合机|消声器|混合器|管道过滤器|空气过滤器|精细过滤器 | 雾度计-雾度仪-透光率测试仪-3nh品牌雾度仪生产厂家 | 四合扣-工字扣-帽钉(831,200,警用,大拉力四合扣)-永嘉县鑫达钮扣有限公司 | 永康微网站建设、永康手机网站建设、永康营销型网站建设、永康外贸网站建设、永康网站托管、永康网络公司—英汇网络 - 永康市英汇网络技术有限公司 | 欧洲_西班牙进口_燃木真火壁炉集成服务商_燃木壁炉官方网站.hergom | 重庆教师资格网-重庆教师资格证考试网 | 啤酒厂家_啤酒代工厂_原浆啤酒厂家 - 山东十谷啤酒有限公司 | 河北安润防腐管业股份有限公司-远程供液管路_远程供液_远程供液系统 | 天木生物科技有限公司-高通量自动化-细胞筛选平台 | 上海慧泰仪器制造有限公司_一体型马弗炉-可控真空干燥箱-强光稳定性试验箱 | 宁波管道安装_宁波工业冷风机_宁波冷风机厂家_宁波厂房通风降温_「浙江甬风机电」 | 聚丙烯酰胺,聚合氯化铝,重金属捕捉剂,污泥调理剂,活性氧化铝,生石灰,反渗透阻垢剂,工业葡萄糖,硫酸铝,果壳活性炭,柱状活性炭,蜂窝活性炭,石英砂,锰砂-北京雁归来环保科技有限公司-以真诚为立足之本,以质量为生存之本,愿与海内外同仁共创双赢。雁归来人一路走来,气贯长虹,勇锐盖过怯弱,进取压倒苟安!我们紧扣时代脉搏,专注水处理、继往开来! | 智能访客系统 - 来访登记系统_微信预约系统_人员出入管理系统_访客机_人脸识别系统门禁闸机 | 砂金设备-淘金机械-金矿选矿设备厂家-青州冠诚重工机械有限公司 砂浆生产线_干混砂浆设备_干混砂浆生产线-苏州一工机械有限公司 | 莫非传媒官网-江西知名的网络营销推广服务平台南昌网络公司,专业网络公关,品牌危机处理,网站SEO优化,微信朋友圈广告,网站建设,南昌莫非文化传媒有限公司 | 华为交换机及防火墙-H3C无线AP-鸿远腾达华为交换机总代理商 | 武汉东湖高新集团股份有限公司官网| 南昌今工科技有限公司 | 球墨井盖厂家-铸铁井盖批发-雨水篦子生产厂家-安徽含山县林头新华铸造厂 | 汽车漆|汽车油漆|工业油漆涂料|汽车漆加盟-佛山市科涂涂料有限公司 | 软化水设备_锅炉软水设备_全自动软化设备【安装维护、更换树脂】-青岛水处理设备厂家 | 蓝禹太阳能蓄电池 风能储能胶体铅酸电池-扬州东泰电源有限公司 | 制冷加热循环机,密闭高低温一体机,优质密闭低温冷冻机-无锡冠亚恒温制冷技术有限公司 | 中国建材信息总网-中国建材行业权威的信息资讯平台 | 信管飞软件官网 - 亚拓软件旗下精细化管理软件、进销存管理软件、混凝土ERP、通风设备ERP、风管报价软件、出纳软件、送货单打印软件、ERP软件等免费下载 | 廊坊微信营销,廊坊小程序开发,廊坊APP开发(安卓_苹果ios开发),微信朋友圈广告,百度推广,廊坊网络公司品牌服务商-河北盛秋网络科技有限公司 | 吸气式感烟火灾探测器|极早期烟雾系统|空气采样报警|拓普兰 | 武汉办公室装修公司-展厅装修_湖北今泰装饰工程有限公司官网_今泰装饰 | 威海华东数控股份有限公司 | 湖南净声源环保科技有限公司是一家专业从事噪声治理和建筑声学设计生态环境综合治理服务的企业,专业从事株洲电梯隔音治理,湘潭中央空调降噪处理,衡阳邵阳冷却塔噪音治理,岳阳常德大型风机噪声隔音降噪,张家界空压机噪声治理,益阳配电房变压器噪声治理,专业郴州永州工厂企业车间噪声治理,怀化娄底专业机械设备减振降治理,武汉噪音治理隔音降噪公司,孝感噪音治理,立式球磨机的噪声控制,专业隔音降噪公司,、以及各类机械动力设备减振降噪噪声治理的公司,同时为客户提供咨询与解决方案 | 小耳朵电源_安防监控电源|小耳朵官网|电源适配器|摄像机电源|开关电源|小耳朵监控电源 |