Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

easyswoole 使用php-amqplib/php-amqplib 进行自定义消费会出现mysql连接断开 #534

Open
645263 opened this issue Mar 20, 2022 · 6 comments

Comments

@645263
Copy link

645263 commented Mar 20, 2022

软件版本:
easyswoole: 3.5.1
php-amqplib/php-amqplib:3.1.2
swoole: 4.4.23

问题出现时机:
1、项目运行1天样子基本就会出现 SQLSTATE[HY000] [2006] MySQL server has gone away ,一旦自定义进程出现这个报错,必须重启进程才可以解决,
2、在项目消费者里面已经尝试加了 \Co::sleep(0.01);,并未解决mysql 链接断开的问题
一旦自定义进程报 MySQL server has gone away,后面的消费 依然是继续报:
Connection reset by peer or Transport endpoint is not connected

群里网名“ 那就这样吧” 的开发者也遇到同样的问题

目前的解决办法:
1、捕获异常,重新入列,然后重启进程

希望官方解决一下这个问题,感谢

自定义进程代码:

<?php
namespace App\Process\RabbitMqConsumer;
use App\HttpController\ShopApi\Service\ClientService;
use App\Models\VipPayOrderModel;
use App\Service\LogService;
use App\Service\MqFanoutService;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\RabbitMq\MqJob;
use EasySwoole\RabbitMq\MqQueue;
use EasySwoole\RabbitMq\RabbitMqQueueDriver;

/**
 * mq广播消费进程
 * Class ClientRegisterFinishProcess
 * @package App\Process
 */
class RabbitMqConsumerProcess extends AbstractProcess
{
    protected function run($arg)
    {
        $queueName  = 'WORK_QUEUE';
        $config     = \EasySwoole\EasySwoole\Config::getInstance()->getConf("rabbitmq");
        $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
        $channel    = $connection->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $callback = function ($msg) use ($channel){
            $headersObject = $msg->get_properties()['application_headers'];
            $headersArray  = $headersObject->getNativeData();
            $body = json_decode($msg->body, true);
            $tag = $body['desc'];
            $msgId = $body['msgId'];
            $listenerKey = $body['listenerKey'];
            try {
                if ($listenerKey){
                    \Co::sleep(0.01);
                    \EasySwoole\EasySwoole\Task\TaskManager::getInstance()->sync(function () use ($body){
                        WorkQueueService::handler($body);
                    });
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                }
            }catch (\Throwable $e){
                LogService::updateRabbitMqLog($msgId,'RabbitMqQueueConsumerProcess 消费异常:'.$e->getMessage(),2);
                if($headersArray['retry'] > $headersArray['maxRetry']){
                    Logger::getInstance()->error("{$tag}-达到最大重试次数!,停止重试,参数={$msg->body},errMsg=".$e->getMessage());
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                    //todo 这里需要写db
                    return;
                }
                Logger::getInstance()->error("{$tag}-工作队列处理异常,开始重试!,参数={$msg->body},errMsg=".$e->getMessage());
                $headersArray['retry']++;
                Logger::getInstance()->waring("{$tag}-消息ID:".$body['msgId'].' 第'.$headersArray['retry'].'次失败,消息重新入队');
                $exchange      = $msg->getExchange();
                $routingKey    = $msg->getRoutingKey();
                $body          = $msg->getBody();
                \Co::sleep(5);
                $msg->delivery_info['channel']->basic_publish(
                    new AMQPMessage($body,['application_headers'=>new AMQPTable($headersArray)]),
                    $exchange,
                    $routingKey
                );
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            }
        };
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }

    protected function onException(\Throwable $throwable, ...$args)
    {
        Logger::getInstance()->error("广播队列异常:".$throwable->getMessage());
        Logger::getInstance()->error("广播队列异常trace:".$throwable->getTraceAsString());
        parent::onException($throwable, $args);
    }

    protected function onShutDown()
    {
        Logger::getInstance()->waring("RabbitMqConsumerProcess 进程退出了");
        parent::onShutDown(); // TODO: Change the autogenerated stub
    }
}
@function-gy
Copy link

function-gy commented Mar 22, 2022

是因为rabbitmq 中的订阅消息模式是采用的while true死循环,导致mysql链接池无法回收process中的链接,如果队列中在一定时间内不进行sql操作,导致链接空闲时间达到数据库空闲链接时间上限,mysql会主动断开此链接,由于ORM未进行重连机制,所以会一段时间没有操作后出现此报错 SQLSTATE[HY000] [2006] MySQL server has gone away,需要更换mysql 操作工具,可使用此包,composer require simple-swoole/db(包内部采用了短线重连机制,并实现了swoole自带链接池),使用process的时候尽量不进行调用外部类方法,redis尽量也使用独立的链接(可采用REDIS扩展)

整体集成方案可参考此包
https://gitee.com/orange-studio/simple-handle

@645263
Copy link
Author

645263 commented Mar 22, 2022

是因为rabbitmq 中的订阅消息模式是采用的while true死循环,导致mysql链接池无法回收process中的链接,如果队列中在一定时间内不进行sql操作,导致链接空闲时间达到数据库空闲链接时间上限,mysql会主动断开此链接,由于ORM未进行重连机制,所以会一段时间没有操作后出现此报错 SQLSTATE[HY000] [2006] MySQL server has gone away,需要更换mysql 操作工具,可使用此包,composer require simple-swoole/db(包内部采用了短线重连机制,并实现了swoole自带链接池),使用process的时候尽量不进行调用外部类方法,redis尽量也使用独立的链接(可采用REDIS扩展)

整体集成方案可参考此包 https://gitee.com/orange-studio/simple-handle

谢谢,我目前使用的解决方案是
自己监听异常,当监听到 SQLSTATE 后就直接close rabbitmq 消费者链接,然后再kill 到当前进程,easyswoole 会重启进程,然后继续完成消费,如果现在去改的话 有太多代码要改

@function-gy
Copy link

队列注意ACK机制,防止消息丢失,其他问题不大

@645263
Copy link
Author

645263 commented Mar 22, 2022

队列注意ACK机制,防止消息丢失,其他问题不大

我是采用手动ack,就是必须执行完成后我才提交ack,这样进程下次启动,消息依然会被消费,直到正常执行完成


} catch (\Throwable $e) {
                    $message = $e->getMessage();
                    Logger::getInstance()->error("延时队列消费异常:" . $message);
                    Logger::getInstance()->error("延时队列消费异常:" . $e->getTraceAsString());
                    $isKillProcess = false;
                    if (strrpos(strtoupper($message), 'SQLSTATE') !== false) {
                        \Co::sleep(1);
                        Logger::getInstance()->error("数据库异常:" . $message);
                        $message = $message . ',重启进程';
                        $isKillProcess = true;
                    }
                    // 邮件通知管理员
                    $title = '延时队列消费异常:' . $message;
                    $body = '数据='.$msg->body.',异常信息:'.$e->getTraceAsString();
                    $form  = [
                        'recipient' => '',
                        'title'     => $title,
                        'body'      => $body,
                        'cc'        => '',
                        'bcc'       => ''
                    ];
                    // 重启进程
                    MqComposer::workQueue('SEND_EMAIL', $form, '异常邮件发送');
                    if ($isKillProcess == true) {
                        $channel->close();
                        $connection->close();
                        \Co::sleep(1);
                        $cmd = "php easyswoole process kill --pid={$pid}";
                        Logger::getInstance()->error("已完成进程重启cmd:\r\n" . $cmd);
                        shell_exec($cmd);
                    }
                }

@function-gy
Copy link

官方一直没解决这个问题 也没办法

@XueSiLf
Copy link
Collaborator

XueSiLf commented May 13, 2022

可以使用非连接池模式,用 easyswoole/mysqli 组件即可,当需要使用数据库时才进行连接mysql并操作数据

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants