Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSError: Socket closed in celery worker with eventlet #4817

Open
jmaroeder opened this issue Jun 14, 2018 · 27 comments
Open

OSError: Socket closed in celery worker with eventlet #4817

jmaroeder opened this issue Jun 14, 2018 · 27 comments

Comments

@jmaroeder
Copy link
Contributor

jmaroeder commented Jun 14, 2018

Celery version: 4.2.0

Steps to reproduce

docker-compose.yml

version: '2'

services:
  worker:
    build: .
    depends_on:
      - rabbitmq

  rabbitmq:
    image: rabbitmq:alpine

Dockerfile

FROM alpine

RUN apk add --no-cache build-base python3 python3-dev
RUN pip3 install celery eventlet

CMD celery -b amqp://rabbitmq worker -P eventlet --loglevel=DEBUG
$ docker-compose up --build

Expected behavior

Worker stays connected without errors

Actual behavior

3 minutes after startup, the worker reports several errors with a Traceback, similar to the following:

[2018-06-14 20:11:44,213: WARNING/MainProcess] Traceback (most recent call last):
[2018-06-14 20:11:44,213: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/eventlet/hubs/poll.py", line 114, in wait
    listener.cb(fileno)
[2018-06-14 20:11:44,214: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/celery/worker/pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2018-06-14 20:11:44,214: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 301, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2018-06-14 20:11:44,215: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2018-06-14 20:11:44,216: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2018-06-14 20:11:44,217: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2018-06-14 20:11:44,217: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2018-06-14 20:11:44,218: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 426, in _read
    raise IOError('Socket closed')
[2018-06-14 20:11:44,218: WARNING/MainProcess] OSError: Socket closed
[2018-06-14 20:11:44,219: WARNING/MainProcess] Removing descriptor: 7

In addition, the rabbitmq server reports warnings similar to the following (repeated twice):

2018-06-14 20:11:44.209 [warning] <0.586.0> closing AMQP connection <0.586.0> (172.19.0.3:42678 -> 172.19.0.2:5672):
missed heartbeats from client, timeout: 60s
@jmaroeder
Copy link
Contributor Author

Note that when using prefork instead of eventlet, the rabbitmq server still reports the missed heartbeats warning, but there are no warnings printed in the worker:

2018-06-14 20:36:04.807 [warning] <0.698.0> closing AMQP connection <0.698.0> (172.19.0.3:46040 -> 172.19.0.2:5672):
missed heartbeats from client, timeout: 60s

@jmaroeder
Copy link
Contributor Author

And gevent fails with a similar error message to eventlet:

[2018-06-14 20:43:29,908: WARNING/MainProcess] Traceback (most recent call last):
[2018-06-14 20:43:29,908: WARNING/MainProcess] File "src/gevent/_waiter.py", line 119, in gevent.__waiter.Waiter.switch
[2018-06-14 20:43:29,908: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/celery/worker/pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2018-06-14 20:43:29,909: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 301, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2018-06-14 20:43:29,909: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2018-06-14 20:43:29,909: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2018-06-14 20:43:29,910: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2018-06-14 20:43:29,910: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2018-06-14 20:43:29,911: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 418, in _read
    s = recv(n - len(rbuf))
[2018-06-14 20:43:29,911: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/gevent/_socket3.py", line 380, in recv
    return _socket.socket.recv(self._sock, *args)
[2018-06-14 20:43:29,912: WARNING/MainProcess] ConnectionResetError: [Errno 104] Connection reset by peer
[2018-06-14 20:43:29,912: WARNING/MainProcess] 2018-06-14T20:43:29Z
[2018-06-14 20:43:29,913: WARNING/MainProcess] <built-in method switch of gevent.__greenlet_primitives.TrackedRawGreenlet object at 0x7f791830ce88> failed with ConnectionResetError

@jmaroeder
Copy link
Contributor Author

Also, I can confirm that I see the same behavior on the latest development versions of all libraries, using this Dockerfile:

FROM alpine

RUN apk add --no-cache build-base python3 python3-dev
RUN pip3 install https://github.com/celery/celery/zipball/master#egg=celery https://github.com/celery/billiard/zipball/master#egg=billiard https://github.com/celery/py-amqp/zipball/master#egg=amqp https://github.com/celery/kombu/zipball/master#egg=kombu https://github.com/celery/vine/zipball/master#egg=vine eventlet gevent

CMD celery -b amqp://rabbitmq worker -P eventlet --loglevel=DEBUG

@wrkhenddher
Copy link

wrkhenddher commented Jun 15, 2018

This matches what we are experiencing. On a single queue, we have multiple workers task_acks_late = True and worker_prefetch_multiplier = 1 and we have mixture of long-lasting tasks and short ones.

Switching back to Celery 3.x makes this issue "disappear"

FYI

At first, retries weren't working when using celery 4.1.0 and kombu 4.1.0. After updating both to 4.2.0 and 4.2.1 respectively, retries started to work but same timeout messages started to appear and tasks seemed to be delivered correctly but never processed by workers. We are using prefork

-amqp==2.2.2
+amqp==1.4.9
-billiard==3.5.0.3
+billiard==3.3.0.23
-celery==4.2.0
+celery==3.1.23
-kombu==4.2.1
+kombu==3.0.34
-pyramid-celery==3.0.0
+pyramid-celery==2.0.0

@perce7al
Copy link
Contributor

The broker connection uses the heartbeat setting from app config file by default (since version 4.2.0, #4148).

Configuration and defaults - Documentation

broker_heartbeat

transports supported:

  • pyamqp

Default: 120.0 (negotiated by server).

You can try to set broker_heartbeat=0. Hope this helps.

@zibuyu1995
Copy link

@y0ngdi I set broker_heartbeat=0 , there is a reduction in the error, but it still exists.

amqp==2.3.2
celery==4.2.0
gevent==1.3.4
greenlet==0.4.13
kombu==4.2.0
the worker reports several errors with a Traceback:
[2018-06-22 09:41:21,467: INFO/MainProcess] sync with celery@device_producer_tasks
[2018-06-22 09:41:21,468: ERROR/MainProcess] Control command error: error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/celery/worker/pidbox.py", line 46, in on_message
    self.node.handle_message(body, message)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 129, in handle_message
    return self.dispatch(**body)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 112, in dispatch
    ticket=ticket)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 135, in reply
    serializer=self.mailbox.serializer)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 265, in _publish_reply
    **opts
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/channel.py", line 1732, in _basic_publish
    (0, exchange, routing_key, mandatory, immediate), msg
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/transport.py", line 275, in write
    self._write(s)
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe

@pmxly
Copy link

pmxly commented Jun 25, 2018

I encounterd this issue with celery 4.2 and eventlet but no perfect solution found

@olii
Copy link
Contributor

olii commented Jun 27, 2018

I did a little investigation. Celery does create a new connection but does not call heartbeat_check for it. This results into the RabbitMQ server closing the connection. I have experienced this error with the command connection (e.g. stats, ping, ..)

@olii
Copy link
Contributor

olii commented Jun 27, 2018

Minimal example:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery(
    'myapp',
    broker='amqp://guest@localhost//',
)
app.conf.broker_heartbeat = 5


if __name__ == '__main__':
    app.start()

Run the example

$ python3 example.py worker -l DEBUG

RabbitMQ logs

2018-06-27 19:49:41.109 [info] <0.471.1> accepting AMQP connection <0.471.1> (127.0.0.1:42744 -> 127.0.0.1:5672)
2018-06-27 19:49:41.111 [info] <0.471.1> connection <0.471.1> (127.0.0.1:42744 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'
2018-06-27 19:49:41.116 [info] <0.479.1> accepting AMQP connection <0.479.1> (127.0.0.1:42746 -> 127.0.0.1:5672)
2018-06-27 19:49:41.118 [info] <0.479.1> connection <0.479.1> (127.0.0.1:42746 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'
2018-06-27 19:49:41.131 [info] <0.500.1> accepting AMQP connection <0.500.1> (127.0.0.1:42748 -> 127.0.0.1:5672)
2018-06-27 19:49:41.133 [info] <0.500.1> connection <0.500.1> (127.0.0.1:42748 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'
2018-06-27 19:49:56.135 [warning] <0.500.1> closing AMQP connection <0.500.1> (127.0.0.1:42748 -> 127.0.0.1:5672):
missed heartbeats from client, timeout: 5s

Attempt to send a command after the connection has been closed by RabbitMQ:

$ celery inspect ping

Celery worker log prints this when it receives the command:

[2018-06-27 19:51:25,638: DEBUG/MainProcess] pidbox received method ping() [reply_to:{'exchange': 'reply.celery.pidbox', 'routing_key': '5a7fe1f1-be67-397f-879c-d939ea3c076e'} ticket:d80183f3-a236-4057-841b-6b8cd2926917]
[2018-06-27 19:51:25,639: ERROR/MainProcess] Control command error: ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/celery/worker/pidbox.py", line 46, in on_message
    self.node.handle_message(body, message)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 129, in handle_message
    return self.dispatch(**body)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 112, in dispatch
    ticket=ticket)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 135, in reply
    serializer=self.mailbox.serializer)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 265, in _publish_reply
    **opts
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 194, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/common.py", line 129, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel, orig)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/common.py", line 135, in _maybe_declare
    entity.declare(channel=channel)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/entity.py", line 185, in declare
    nowait=nowait, passive=passive,
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/channel.py", line 614, in exchange_declare
    wait=None if nowait else spec.Exchange.DeclareOk,
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/transport.py", line 275, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-06-27 19:51:25,647: DEBUG/MainProcess] Closed channel #2
[2018-06-27 19:51:25,647: DEBUG/MainProcess] using channel_id: 2
[2018-06-27 19:51:25,648: DEBUG/MainProcess] Channel open

Possible cause of error

I think there exists a connection that is terminated after from the server side because kombu.connection.Connection.heartbeat_check() is not called repetitively on the instance.

@djlambert
Copy link

I'm seeing the same issue with gevent.

@stojan-jovic
Copy link

Workaround with broker_heartbeat = 0 works for me.

Installed versions:

amqp==2.2.2
celery==4.2.0
eventlet==0.23.0
kombu==4.2.1

Worker:
celery -A celery_app worker --loglevel=info -P eventlet

P.S. Tested on Windows 10, with Python 2 & 3.

DEKHTIARJonathan pushed a commit to DEKHTIARJonathan/FeedCrunch.IO that referenced this issue Jul 22, 2018
@auvipy auvipy closed this as completed Aug 12, 2018
@DEKHTIARJonathan
Copy link

@auvipy : Why closing ? The issue is not fixed...

@auvipy
Copy link
Member

auvipy commented Aug 12, 2018

did @stojan-jovic workaround work?

@DEKHTIARJonathan
Copy link

DEKHTIARJonathan commented Aug 12, 2018

It seems like yes, but a workaround is not bugfix.
And it still lead to a ton of time wasted looking for a solution by everyday more people.

@auvipy
Copy link
Member

auvipy commented Aug 12, 2018

if you come up with a possible fix then it will be re opened.

@ldav1s
Copy link

ldav1s commented Nov 10, 2018

Workaround with broker_heartbeat = 0 seems to work for me for celery 4.2.0 and gevent 1.2.2 (python 2) / RabbitMQ 3.7.8. At least it didn't hang up on the workers at the default 60s timeout for me.

@auvipy auvipy modified the milestones: v4.3, v5.0.0 Nov 17, 2018
@Eduard-gan
Copy link

Eduard-gan commented Dec 19, 2018

Django 1.8.4, Python 2.7.12
Issue appeared after upgrade from 3.1.25 to 4.2.1
CELERY_BROKER_HEARTBEAT = 0 in settings.py solved this.

@auvipy auvipy closed this as completed Dec 21, 2018
@djlambert
Copy link

@auvipy why did you close this? The problem still exists.

@auvipy auvipy removed this from the v5.0.0 milestone Feb 6, 2019
@bkovacev

This comment was marked as abuse.

@gvdmarck
Copy link

Any updates on this ? A work-around is not a bug fix.

@auvipy
Copy link
Member

auvipy commented May 10, 2019

feel free to investigate the issue @gvdmarck as you are facing this, it would be easier for you to find out the root cause.

pauleggleton-intel pushed a commit to intel/clear-linux-dissector-web that referenced this issue May 14, 2019
I have been seeing repeated emailed errors from Django reporting
"ConnectionResetError: [Errno 104] Connection reset by peer" in the call
to get task status i.e:

File "/opt/layerindex/layerindex/views.py" in task_log_view
  1572.         if result.ready():

Digging around this seems to be some sort of known bug:

celery/celery#4817
celery/celery#4980

The workaround suggested is to disable the broker heartbeat, so try
that in order to avoid the errors.

Signed-off-by: Paul Eggleton <paul.eggleton@linux.intel.com>
anelliot pushed a commit to anelliot/layerindex-web that referenced this issue Apr 7, 2020
I have been seeing repeated emailed errors from Django reporting
"ConnectionResetError: [Errno 104] Connection reset by peer" in the call
to get task status i.e:

File "/opt/layerindex/layerindex/views.py" in task_log_view
  1572.         if result.ready():

Digging around this seems to be some sort of known bug:

celery/celery#4817
celery/celery#4980

The workaround suggested is to disable the broker heartbeat, so try
that in order to avoid the errors.

Signed-off-by: Paul Eggleton <paul.eggleton@linux.intel.com>
@grayguest
Copy link

Django 1.8.4, Python 2.7.12
Issue appeared after upgrade from 3.1.25 to 4.2.1
CELERY_BROKER_HEARTBEAT = 0 in settings.py solved this.

should be BROKER_HEARTBEAT instead of CELERY_BROKER_HEARTBEAT?

@venu13
Copy link

venu13 commented Sep 12, 2020

amqp==2.2.2

I am trying to run celery worker with this configuration, but celery worker is not starting up.
The command I am using is as follows
pipenv run celery worker -A src.celery_app -l debug -P eventlet
But I don't see any celery startup log, or anything it directly gets back to windows cmd prompt.

@stojan-jovic , can you let me know how to setup the environment for celery.

@auvipy auvipy modified the milestones: 4.6, 4.5 Jan 3, 2021
@auvipy auvipy modified the milestones: 4.5, Future Feb 17, 2021
@LeadhuntAI
Copy link

facing the same issue, please to all the people out there who were able to solve this share your solutions :)

celery_hearbeat = 0 doesn't work for me..

@auvipy
Copy link
Member

auvipy commented Dec 12, 2021

@LeadhuntAI can you try this please and let us know celery/py-amqp#374?

@auvipy auvipy added the pyamqp label Dec 12, 2021
@harshita01398
Copy link

Is there any fix for this? Observing it on Celery-4.3.1, Kombu-4.6.11, Redis-4.1.2

This happens especially when disable_sync_subtasks=False is used

@LeadhuntAI
Copy link

@harshita01398 i switched to pool=gevent and its running since a long while now without issues (except that sometimes celery switches itself off, like the execution in the console stops, no idea why but overall it works great)
am starting it with this command, it runs on windows and amazonMQ (which is same as rabbitMQ) but might help you too:
celery -A YOUR_PROJECT_NAME worker -l info --pool=gevent --concurrency=10 --without-heartbeat --without-gossip --without-mingle

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment