企业级PHP异步RabbitMQ协程版客户端 2.0 正式发布

news/2025/1/10 12:49:58 标签: php, rabbitmq, 开发语言

概述

workerman/rabbitmq 是一个异步RabbitMQ客户端,使用AMQP协议。

RabbitMQ是一个基于AMQP(高级消息队列协议)实现的开源消息组件,它主要用于在分布式系统中存储和转发消息。RabbitMQ由高性能、高可用以及高扩展性出名的Erlang语言写成,具有高度的可靠性和可扩展性。它支持多种消息协议,包括AMQP、STOMP、MQTT等,并广泛应用于消息队列、消息中间件等领域。

RabbitMQ允许应用程序通过消息传递进行通信,这使得不同的应用程序可以在不同的语言和操作系统之间进行通信。

RabbitMQ的消息工作机制涉及消息从发送端到接收端的流转过程。在这个过程中,消息首先被发送到交换机(Exchange),然后交换机根据路由规则将消息路由到一个或多个队列(Queue)中。消费者(Consumer)从队列中获取消息并进行处理。

生产者和消费者

安装

composer require workerman/rabbitmq

消费者

receive.php

<?php

declare(strict_types=1);

use Bunny\Channel;
use Bunny\Message;
use Workerman\Worker;
use Workerman\RabbitMQ\Client;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->eventLoop = \Workerman\Events\Revolt::class;

$worker->onWorkerStart = function() {
    // Create RabbitMQ Client
    $client = Client::factory([
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'heartbeat' => 60,
        'heartbeat_callback' => function () {
            echo " [-] coroutine-consumer-heartbeat\n";
        },
        'interval' => [100, 300]
    ])->connect();
    $channel = $client->channel();
    $channel->queueDeclare('hello-coroutine');

    // Consumer
    $channel->consume(function (Message $message, Channel $channel, \Bunny\AbstractClient $client) {
        echo " [>] Received ", $message->content, "\n";
    },
        'hello-coroutine',
        '',
        false,
        true
    );
    $client->run();
    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

    // Producer
    \Workerman\Timer::add($interval = 5 , function () use ($channel) {
        $channel->publish($message = 'Hello World By Self Timer. ' . time(), [], '', 'hello-coroutine');
        echo " [<] Sent $message\n";
    });
    echo " [!] Producer timer created, interval: $interval s.\n";

};
Worker::runAll();

运行命令

php receive.php start

基于 Workerman 发布

send.php

<?php

declare(strict_types=1);

use Workerman\RabbitMQ\Client;
use Workerman\Worker;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->eventLoop = \Workerman\Events\Revolt::class;

$worker->onWorkerStart = function() {
    $client = Client::factory([
        'host' => 'host.docker.internal',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'heartbeat' => 60,
        'heartbeat_callback' => function () {
            echo "coroutine-producer-heartbeat\n";
        }
    ])->connect();
    $channel = $client->channel();
    $channel->queueDeclare('hello-coroutine');

    // 每5秒发一个消息
    \Workerman\Timer::add(5, function () use ($channel) {
        $channel->publish($message = 'Hello World By Workerman Env Producer. ' . time(), [], '', 'hello-coroutine');
        echo " [x] Sent '$message'\n";
    });
};
Worker::runAll();

运行命令

php send.php start

基于 PHP-FPM 发布

script.php

<?php

declare(strict_types=1);

use Workerman\RabbitMQ\Client;

require_once __DIR__ . '/vendor/autoload.php';

$client = Client::factory([
    'host' => 'host.docker.internal',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
    'vhost' => '/',
    'heartbeat' => 60,
    'heartbeat_callback' => function () {
        echo "coroutine-producer-heartbeat\n";
    }
])->connect();
$channel = $client->channel();
$channel->queueDeclare('hello-coroutine');
$res = $channel->publish($message = 'Hello World By Normal Producer. ' . time(), [], '', 'hello-coroutine');

echo " [x] Sent '$message', success: $res\n";

运行命令

php script.php

异步消费者

receive.php

<?php

use Bunny\Channel;
use Bunny\Message;
use Workerman\Worker;
use Workerman\RabbitMQ\Client;

require __DIR__ . '/vendor/autoload.php';

$worker = new Worker();

$worker->onWorkerStart = function() {
    (new Client())->connect()->then(function (Client $client) {
        return $client->channel();
    })->then(function (Channel $channel) {
        return $channel->queueDeclare('hello', false, false, false, false)->then(function () use ($channel) {
            return $channel;
        });
    })->then(function (Channel $channel) {
        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
        $channel->consume(
            function (Message $message, Channel $channel, Client $client) {
                echo " [x] Received ", $message->content, "\n";
            },
            'hello',
            '',
            false,
            true
        );
    });
};
Worker::runAll();

运行命令

php receive.php start

异步生产者

send.php

<?php
use Bunny\Channel;
use Bunny\Message;
use Workerman\Worker;
use Workerman\RabbitMQ\Client;

require __DIR__ . '/vendor/autoload.php';

$worker = new Worker();

$worker->onWorkerStart = function() {
    (new Client())->connect()->then(function (Client $client) {
        return $client->channel();
    })->then(function (Channel $channel) {
        return $channel->queueDeclare('hello', false, false, false, false)->then(function () use ($channel) {
            return $channel;
        });
    })->then(function (Channel $channel) {
        echo " [x] Sending 'Hello World!'\n";
        return $channel->publish('Hello World!', [], '', 'hello')->then(function () use ($channel) {
            return $channel;
        });
    })->then(function (Channel $channel) {
        echo " [x] Sent 'Hello World!'\n";
        $client = $channel->getClient();
        return $channel->close()->then(function () use ($client) {
            return $client;
        });
    })->then(function (Client $client) {
        $client->disconnect();
    });
};
Worker::runAll();

运行命令

php send.php start


http://www.niftyadmin.cn/n/5818672.html

相关文章

redis:安装部署、升级以及失败回退

安装部署 一、准备工作 1. 检查系统要求 确保你的服务器满足 Redis 的基本要求: 操作系统:支持的 Linux 发行版(如 Ubuntu, CentOS)内存:至少 4GB(根据实际应用需求调整)CPU:单核或多核 CPU磁盘空间:足够的磁盘空间用于数据存储和日志记录2. 更新系统软件包 在开始…

数据结构大作业——家谱管理系统(超详细!完整代码!)

目录 设计思路&#xff1a; 一、项目背景 二、功能分析 查询功能流程图&#xff1a; 管理功能流程图&#xff1a; 三、设计 四、实现 代码实现&#xff1a; 头文件 结构体 函数声明及定义 创建家谱树头结点 绘制家谱树&#xff08;打印&#xff09; 建立右兄弟…

c语言-----常识问题

1.VS的C4996错误 由于微软在VS2013中不建议再使用C的传统库函数scanf,strcpy,sprintf等&#xff0c;所以直接使用这些库函数会提示C4996错误&#xff1a; VS建议采用带_s的函数&#xff0c;如scanf_s、strcpy_s&#xff0c;但这些并不是标准C函数。 要想继续使用此函数&…

Win32汇编学习笔记09.SEH和反调试

Win32汇编学习笔记09.SEH和反调试-C/C基础-断点社区-专业的老牌游戏安全技术交流社区 - BpSend.net SEH - structed exception handler 结构化异常处理 跟筛选一样都是用来处理异常的,但不同的是 筛选器是整个进程最终处理异常的函数,但无法做到比较精细的去处理异常(例如处理…

ASP.NET CORE 实现微服务 - 分布式事务 - 2PC、3PC、TCC

在微服务体系下&#xff0c;我们的应用被分割成多个服务&#xff0c;每个服务都配置一个数据库。如果我们的服务划分的不够完美&#xff0c;那么为了完成业务会出现非常多的跨库事务。即使按照 DDD 的原则来切分服务还是免不了有的业务场景需要多个业务同时提交成功或者同时回滚…

02.MPLS动态LSP配置实验

MPLS动态LSP配置实验 MPLS动态LSP配置实验基本配置MPLS部分查看LDP会话基本信息查看详细信息查看MPLS动态LSP配置实验 基本配置 接口地址和OSPF配置,省略!全网互通! 注意:8.8.8.8和9.9.9.9也要宣告! MPLS部分 [ar1]mpls lsr-id 1.1.1.1 [ar1]mpls Info: Mpls starting,…

设计模式 行为型 观察者模式(Observer Pattern)与 常见技术框架应用 解析

观察者模式&#xff08;Observer Pattern&#xff09;是一种行为设计模式&#xff0c;它定义了一种一对多的依赖关系&#xff0c;让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时&#xff0c;会通知所有观察者对象&#xff0c;使它们能够自动更新。 一…

【大数据】(选修)实验7 基于Spark的安装和编程实践

实验7 基于Spark的安装和编程实践 1、实验目的 (1)通过实验掌握Spark的安装和配置,基本命令行,以及代码打包提交的sbt方式和Maven方式; (2)掌握使用pySpark进行RDD编程操作,包括基本读写、转换操作和动作操作的RDD编程; 2、实验平台 操作系统:Linux Hadoop版本:…