Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Freezing consumer after few minutes #60

Open
yiiman-dev opened this issue Jun 14, 2022 · 2 comments
Open

Freezing consumer after few minutes #60

yiiman-dev opened this issue Jun 14, 2022 · 2 comments

Comments

@yiiman-dev
Copy link

yiiman-dev commented Jun 14, 2022

Hi everyone

At first, thanks for this useful repository

I have problem on consumer connection

It not work after few minutes, no error, no any response in console

I am using Supervisor on docker

My supervisor config:


[supervisord]
logfile=/dev/stdout
logfile_maxbytes=0
logfile_backups=0
loglevel=info
nodaemon=true


[program:php-fpm]
command=php-fpm -F -y /usr/local/etc/php-fpm.conf -c /usr/local/etc/php/conf.d/local.ini -R
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr

[program:consume]
command=php /home/www/app/yii rabbitmq/consume shahkar
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr

My rabbit config:

 [
    //Config help:: https://github.com/mikemadisonweb/yii2-rabbitmq
    'class'             => \mikemadisonweb\rabbitmq\Configuration::class,
    'auto_declare'      => true,
    'connections'       =>
        [
            [
                'name'     => 'rabbit',
                // You can pass these parameters as a single `url` option: https://www.rabbitmq.com/uri-spec.html
                'host'     => ENV::get('RABBIT_HOST'),
                'port'     => ENV::get('RABBIT_PORT'),
                'user'     => ENV::get('RABBIT_USER'),
                'password' => ENV::get('RABBIT_PASSWORD'),
                'vhost'    => ENV::get('RABBIT_VHOST'),

                'type'                => AMQPLazyConnection::class,
                'url'                 => null,
                'connection_timeout'  => 3,
                'read_write_timeout'  => 3,
                'ssl_context'         => null,
                'keepalive'           => false,
                'heartbeat'           => 0,
                'channel_rpc_timeout' => 0.0
            ]
            // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks
        ],
    'exchanges'         =>
        [
            [
                'name'        => 'd_exchange',
                'type'        => 'direct',
                'passive'     => false,
                'durable'     => true,
                'auto_delete' => false,
                'internal'    => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
            ],
        ],
    'queues'            =>
        [
            [
                'name'        => 'sms',
                'passive'     => false,
                'durable'     => true,
                'exclusive'   => false,
                'auto_delete' => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
                // Queue can be configured here the way you want it:
                //'durable' => true,
                //'auto_delete' => false,
            ],
            [
                'name'        => 'log',
                'passive'     => false,
                'durable'     => true,
                'exclusive'   => false,
                'auto_delete' => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
            ],
            [
                'name'        => 'nationalID',
                'passive'     => false,
                'durable'     => true,
                'exclusive'   => false,
                'auto_delete' => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
            ]
        ],
    'bindings'          =>
        [
            [
                'exchange'     => 'd_exchange',
                'queue'        => 'sms',
                'to_exchange'  => null,
                'routing_keys' => ['sms'],
            ],
            [
                'exchange'     => 'd_exchange',
                'queue'        => 'log',
                'to_exchange'  => null,
                'routing_keys' => ['log'],
            ],

        ],
    'producers'         =>
        [
            [
                'name'          => 'producer',
                'connection'    => 'rabbit',
                'safe'          => true,
                'content_type'  => 'text/plain',
                'delivery_mode' => 2,
                'serializer'    => 'serialize',
            ],

            [
                'name'          => 'log',
                'connection'    => 'rabbit',
                'safe'          => true,
                'content_type'  => 'text/plain',
                'delivery_mode' => 2,
                'serializer'    => 'serialize',
            ],


        ],
    'consumers'         =>
        [
            [
                'name'                   => 'shahkar',
                // Every consumer should define one or more callbacks for corresponding queues
                'connection'             => 'rabbit',
                'qos'                    => [
                    'prefetch_size'  => 0,
                    'prefetch_count' => 0,
                    'global'         => false,
                ],
                'idle_timeout'           => 0,
                'idle_timeout_exit_code' => null,
                'proceed_on_exception'   => false,
                'deserializer'           => 'unserialize',
                'callbacks'              =>
                    [
                        // queue name => callback class name
                        'sms' => ShahkarConsumer::class,
                    ],
            ],
        ],
    'on before_consume' => function ($event) {
        if (\Yii::$app->has('db') && \Yii::$app->db->isActive) {
            try {
                \Yii::$app->db->createCommand('SELECT 1')->query();
            } catch (\yii\db\Exception $exception) {
                \Yii::$app->db->close();
            }
        }
    },
    'logger'            => [
        'log'           => true,
        'category'      => 'application',
        'print_console' => true,
        'system_memory' => true,
    ],
]

My consumer class:

namespace app\consumers;

use app\base\BaseConsumer;
use app\models\ModuleCorporates;
use app\models\ModuleTemp;
use Faker\Factory;
use Faker\Provider\fa_IR\Address;
use Faker\Provider\fa_IR\Person;
use Kavenegar\KavenegarApi;
use mikemadisonweb\rabbitmq\components\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class ShahkarConsumer extends BaseConsumer implements \mikemadisonweb\rabbitmq\components\ConsumerInterface
{

    /**
     * @inheritDoc
     */
    public function execute(AMQPMessage $msg)
    {
        $data = json_decode($msg->body);

        /**
         * Data format for `request_corporate_details` is : Object
         *[
         *  'mode'                  => 'request_corporate_details',
         *  'corporate_national_id' => '.........STRING..........',
         *  'ceo_nation_code'       => '.........STRING..........',
         *  'ceo_uid'               => '..........INT.............'
         *]
         */
        try {
            $faker = Factory::create();
            $person = new Person;
            $model = new ModuleTemp();
            $fields =
                [
                    'name'                     => $person->name(),
                    'operating_license_number' => $faker->numberBetween(1111111, 9999999),
                    'year_of_operation'        => $faker->numberBetween(1350, 1100),
                    'description'              => $faker->text(255),
                    'address'                  => $faker->address,
                    'phone'                    => $faker->phoneNumber,
                    'economic_code'            => $faker->numberBetween(1111111111, 21399999999),
                    'legal_mode'               => $faker->numberBetween(1, 3),
                    'site'                     => $faker->url,
                    'field_id'                 => null,
                    'category_id'              => null,
                    'ceo_uid'                  => $data->uid,
                    'region_id'                => null,
                    'province_id'              => null,
                    'city_id'                  => null,
                    'national_id'              => $data->national_id,
                    'ceo_name'                 => $faker->name,
                    'ceo_phone'                => $faker->phoneNumber,
                    'ceo_national_code'        => $person::nationalCode(),
                    'postal_code'              => Address::postcode(),
                    'production_plan'          => null,
                ];
            $model->key = 'corp_'.$data->ceo_uid;
            $model->value = json_encode($fields);
            $model->created_at = date('Y-m-d H:i:s');
            $model->age = 10800;
            $model->save();
        } catch (\Exception $e) {
            $this->logException($e);
            return ConsumerInterface::MSG_REQUEUE;
        }
        return ConsumerInterface::MSG_ACK;
    }
}

Top result in Linux after few minutes, when connection is freezed:

Mem: 7993764K used, 173456K free, 83884K shrd, 1878812K buff, 3713880K cached
CPU:   3% usr   1% sys   0% nic  95% idle   0% io   0% irq   0% sirq
Load average: 0.16 0.15 0.12 3/706 31
  PID  PPID USER     STAT   VSZ %VSZ CPU %CPU COMMAND
   21    17 www-data S     169m   2%   0   0% php-fpm: pool www
   22    17 www-data S     169m   2%   2   0% php-fpm: pool www
   17     7 root     S     169m   2%   2   0% php-fpm: master process (/usr/local/etc/php-fpm.conf)
    7     1 root     S     138m   2%   1   0% supervisord -c /home/www/.deploy/config/supervisor.conf
   16     7 root     S    47100   1%   1   0% php /home/www/app/yii rabbitmq/consume sms
   25     0 root     S     1676   0%   1   0% /bin/sh
    1     0 root     S     1608   0%   3   0% sh /entrypoint.sh
   31    25 root     R     1604   0%   3   0% top

After few minutes consumer is running but in fact it is freezed and dont work.

Any idea?

@arlandantas
Copy link

Hi @yiiman-dev .
The same occurred with me.

I solved adding "heartbeat" and "read_write_timeout" fields in connection array with a number greater than 0 (I've used 60 for both).

@michaelarnauts
Copy link
Contributor

Note that setting both heartbeat and read_write_timeout to the same value can cause a race condition where the connection is closed just before a heartbeat arrives.

It seems to be recommended to use 60 seconds as heartbeat and 130 seconds as read_write_timeout. This is actually the default of php-amqplib. See php-amqplib/php-amqplib#648

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants