Commit df9de770 authored by wangbiaoyan's avatar wangbiaoyan
Browse files

新增消费方法

No related merge requests found
Showing with 39 additions and 0 deletions
+39 -0
......@@ -42,6 +42,12 @@ abstract class MqConsumeInterface
{
}
/**
* 版本号
* @var string $version
*/
protected $version = "";
/**
* 消费后置动作
* @return mixed
......@@ -108,4 +114,37 @@ abstract class MqConsumeInterface
}
}
}
public function handleConsumeV1()
{
$params = [
"exchangeName" => $this->exchangeName,
"exchangeType" => $this->exchangeType,
"queueName" => $this->queueName,
"routeKey" => $this->routeKey,
"callback" => [ $this, "consumeCallback" ],
"object" => $this
];
$consumeParamBean = new ConsumeParamBean($params);
switch (config("queue.connections.default")) {
case "rabbitmq":
$rabbitMq = RabbitMq::getInstance();
//声明交换机
$rabbitMq->declareExchange($consumeParamBean->getExchangeName(), $consumeParamBean->getExchangeType());
//声明队列
$rabbitMq->declareQueue($consumeParamBean->getQueueName(), $consumeParamBean->getRouteKey());
break;
default:
throw new MqException("未知消息队列");
}
while (true) {
try {
$rabbitMq->getQueue()->consume([ $this, "consumeCallback" ]);
} catch (\Throwable $exception) {
$this->exceptionHandle($exception);
}
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment