详解PHP消息队列的实现以及运用(附流程图)

消息队列的概念、原理、实现方式

概念

  • 队列结构的一个中间件
  • 不需要立即消费消息
  • 由消费者或者订阅者进行按顺序消费

基本的流程图如下所示

  • 流程
    ca434f67a2b74693d8a8db235ff9d3d.jpg

应用场景

  • 冗余
  • 解耦
  • 流量削峰
  • 异步通信

实现方式

  • mysql:可靠、速度慢
  • redis:速度快,对于大消息包处理较慢
  • 消息系统:可靠、专业性强

消息的触发机制

  • 死循环的方式,故障时无法及时恢复
  • 定时任务:压力均分、但是处理量有上限
  • 守护进程的方式

解耦 (订单和配送系统)

  • 架构设计1 采用定时任务的方式
    1013f8bafa2312f0f60c2f8253eb0f2.jpg

  • 使用配送处理系统进行处理时,将当前数据库里需要处理的订单状态更新为2,待处理完成后将状态设为1

  • 可以每次指定更新多少条数据

流量削锋 (redis实现秒杀)

  • 使用队列的数据结构

    • lpush/rpush 将数据放入列表中
    • lpop/rpop 将数据移除列表并获取到移除的值
    • ltrim 保留指定区间内的元素
    • llen 获取列表长度
    • lset 通过索引设置列表的值
    • lindex 通过索引获取列表中的值
    • lrange 获取指定范围的元素
  • 图示如下
    03e20c9fdf5009f0ad7383988c42c6f.jpg

  • 代码流程如下

    • 秒杀程序将请求写入redis(uid,time)

    • 检查redis列表存放的长度,超过10个直接舍弃

    • 通过死循环读取redis数据,并存入数据库

      // Spike.php 秒杀程序if(Redis::llen(&#39;lottery&#39;) < 10){
          // 成功
          Redis::lpush(&#39;lottery&#39;, $uid.&#39;%&#39;.microtime());}else{
          // 失败}
      // Warehousing.php 入库程序while(true){
          $user = Redis::rpop(&#39;lottery&#39;);
          if (!$user || $user == &#39;nil&#39;) {
              sleep(2);
              continue;
          }
          $user_arr = explode($user, &#39;%&#39;);
          $insert_user = [
              &#39;uid&#39; => $user_arr[0],
              &#39;time&#39; => $user_arr[1]
          ];
          $res = DB::table(&#39;lottery_queue&#39;)->insert($insert_user);
          if (!$res) {
              Redis::lpush(&#39;lottery&#39;, $user);
          }}
  • 上述代码中假如并发过大的话会存在超卖的情况,此时可以使用文件锁或者redis分布式锁进行控制,先将商品放入redis list中 使用rpop进行取出,如果取不到则说明已经卖完

  • 具体的思路及伪代码如下

      // 先将商品放入redis中
      $goods_id = 2;
    
      $sql = select id,num from goods where id = $goods_id;
      $res = DB::select($sql);
      if (!empty($res)) {
          // 也可以指定多少件
          Redis::del(&#39;lottery_goods&#39; . $goods_id);
          for($i=0;$i<$res[&#39;num&#39;];$i++){
              Redis::lpush(&#39;lottery_goods . $goods_id&#39;, $i);
          }
          LOG::info(&#39;商品存入队列成功,数量:&#39; . Redis::llen(&#39;lottery_goods . $goods_id&#39;));
      } else {
          LOG::info($goods_id . &#39;加入失败&#39;);
      }
      // 开始秒杀
      $count = Redis::rpop(&#39;lottery_goods&#39; . $goods_id);
      if (!$count) {
          // 商品已抢完
          ...
      }
    
      // 用户抢购队列
      $user_list = &#39;user_goods_id_&#39; . $goods_id;
      $user_status = Redis::sismember($user_list, $user_id);
      if ($user_status) {
          // 已抢过
          ...
      }
    
      // 将抢到的放到列表中
      Redis::sadd($user_list, $uid);
      $msg = &#39;用户:&#39; . $uid . &#39;顺序&#39; . $count;
      Log::info($msg);
      // 生成订单等
      ...
      // 减库存
      $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超卖
      DB::update($sql)
      // 抢购成功

rabbitmq

  • 架构及原理
    1fefd7af11b134a7ce893e66a6bd505.jpg
    其中P代表生产者,X为交换机(channal),C代表消费者

  • 简单使用

      // Send.php
      require_once __DIR__.&#39;/vendor/autoload.php&#39;;
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;guest&#39;, &#39;guest&#39;);
    
      // 创建通道
      $channel = $connection->channel();
      // 声明一个队列
      $channel->queue_declare(&#39;user_email&#39;, false, false, false, false);
      // 制作消息
      $msg = new AMQPMessage(&#39;send email&#39;);
      // 将消息推送到队列
      $channel->basic_publish($msg, &#39;&#39;, &#39;user_email&#39;);
    
      echo &#39;[x] send email&#39;;
    
      $channel->close();
      $connection->close();
      // Receive.php
      require_once __DIR__.&#39;/vendor/autoload.php&#39;;
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;guest&#39;, &#39;guest&#39;);
    
      //创建通道
      $channel = $connection->channel();
    
      $channel->queue_declare(&#39;user_email&#39;, false, false, false, false);
    
      // 当收到消息时的回调函数
      $callback = function($msg){
          //发送邮件
          echo &#39;Received &#39;.$msg->body.&#39;\n&#39;;
      };
    
      $channel->basic_consume(&#39;user_email&#39;, &#39;&#39;, false, true, false, false, $callback);
    
      // 保持监听状态
      while($channel->is_open()){
          $channel->wait();
      }

以上就是详解PHP消息队列的实现以及运用(附流程图)的详细内容,更多请关注其它相关文章!