Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

very simple codes can't execute,why? #8996

Open
broadcast98 opened this issue May 1, 2024 · 6 comments
Open

very simple codes can't execute,why? #8996

broadcast98 opened this issue May 1, 2024 · 6 comments

Comments

@broadcast98
Copy link

broadcast98 commented May 1, 2024

tasks.py

from celery import Celery

##cmd command: celery -A tasks worker --loglevel=INFO

app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')

@app.task
def hello():
    return 'hello world'

test_client.py


import time
from tasks import hello

result0 = hello.delay()
print('result0: ', result0)
print('result0.ready: ', result0.ready())

answer0 = result0.get(timeout=5)
print('answer0: ', answer0)

error message:


(env) F:\test\CELERY>python test_client.py

result0:  dfccff15-8c50-4ec7-98e5-bf1cdfdfef24

result0.ready:  False

Traceback (most recent call last):
  File "F:\test\env\Lib\site-packages\celery\backends\asynchronous.py", line 287, in _wait_for_pending
    for _ in self.drain_events_until(
  File "F:\test\env\Lib\site-packages\celery\backends\asynchronous.py", line 52, in drain_events_until
    raise socket.timeout()
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "F:\test\CELERY\test_client.py", line 8, in <module>
    answer0 = result0.get(timeout=5)
              ^^^^^^^^^^^^^^^^^^^^^^
  File "F:\test\env\Lib\site-packages\celery\result.py", line 251, in get
    return self.backend.wait_for_pending(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "F:\test\env\Lib\site-packages\celery\backends\asynchronous.py", line 221, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "F:\test\env\Lib\site-packages\celery\backends\asynchronous.py", line 293, in _wait_for_pending
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

os: windows 10
python version: 3.12
redis_version:5.0.14.1

I can't find any wrong in the codes.
so what caused the TimeoutError?

@suryan-s
Copy link

suryan-s commented May 6, 2024

@broadcast98 by checking your output i could see that you haven't started the celery instance in order for the python client to use it.

This is the proper way to run it:

  1. Open the terminal and run this:
    celery -A task worker --loglevel=INFO
    this will start the celery instance. You will see the connection outputs.

    @suryan-s ➜  $ celery -A task worker --loglevel=INFO
     
     -------------- celery@codespaces
    --- ***** ----- 
    -- ******* ---- 
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         tasks:0x749242cc30a0
    - ** ---------- .> transport:   <you redis info>
    - ** ---------- .> results:     <you redis info>
    - *** --- * --- .> concurrency: 2 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    
    [task]
      . task.hello
    
    [2024-05-06 12:44:00,314: INFO/MainProcess] mingle: searching for neighbors
    [2024-05-06 12:44:02,343: INFO/MainProcess] mingle: all alone
    [2024-05-06 12:44:04,108: INFO/MainProcess] celery@codespaces-5b0d72 ready.
    
  2. Open another terminal and run the client program:
    python test_client.py

    This is how i got it from your example code:

    @suryan-s ➜ $ python test_client.py
     result0:  564bc8b0-ab44-4245-b0ae-82de2fe0bc82
     result0.ready:  True
     answer0:  hello world
    

This worked out fine.

@broadcast98
Copy link
Author

thank you .
I have 2 queesions:
1, I did start celery instance. I am very very sure I execute "celery -A tasks worker --loglevel=INFO" in one cmd window, and execute "python test_client.py" in another cmd window.
2, if the celery instance not start , why result0 get an task id?

@suryan-s
Copy link

suryan-s commented May 8, 2024

thank you . I have 2 queesions: 1, I did start celery instance. I am very very sure I execute "celery -A tasks worker --loglevel=INFO" in one cmd window, and execute "python test_client.py" in another cmd window. 2, if the celery instance not start , why result0 get an task id?

To answer you question:

  1. From the sample code you have provided, you have named you celery file 'task.py' and thus to start the celery instance you should be using celery -A task worker --loglevel=INFO and not 'tasks'. So i believe your celery instance didn't start.
  2. From the code you provided, the Celery instance is configured to use a Redis server as its message broker and result backend. Even if you haven't started the Celery worker, when you call a task function like hello.delay(), Celery will send a message to the Redis broker with the details of the task to be executed. This operation returns an AsyncResult object, which contains an ID for the task. This ID is generated even before the task is actually executed by a worker.
    This ID can be used to query the status of the task and retrieve the result once it's ready. If you're seeing an ID, it means the task message has been successfully sent to the broker, but it doesn't necessarily mean the task has been executed yet.
    To try an example, if you execute the client side program without starting the celery, the the task to be executed would be saved in the Redis and once you start the celery instance, all the tasks requests being saved in the Redis backend would be executed.

Hope this clarifies your doubt !! :)

@broadcast98
Copy link
Author

broadcast98 commented May 8, 2024

The 'task.py' in the previous question was a typographical error. The actual name of my file is 'tasks.py'.

I have tried the entire process again, and now I am posting the complete error message here:

tasks.py

from celery import Celery

##cmd command: celery -A tasks worker --loglevel=INFO

app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')

@app.task
def hello():
    return 'hello world'

test_client.py


import time
from tasks import hello

result0 = hello.delay()
print('result0: ', result0)
print('result0.ready: ', result0.ready())

answer0 = result0.get(timeout=5)
print('answer0: ', answer0)

error message:


(env) F:\SERVER\home\datad\pyproject\CL>python test_client.py
result0:  98676a10-678c-4c43-8a33-924c6c52930d
result0.ready:  False
Traceback (most recent call last):
  File "F:\SERVER\home\datad\pyproject\env\Lib\site-packages\celery\backends\asynchronous.py", line 287, in _wait_for_pending
    for _ in self.drain_events_until(
  File "F:\SERVER\home\datad\pyproject\env\Lib\site-packages\celery\backends\asynchronous.py", line 52, in drain_events_until
    raise socket.timeout()
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "F:\SERVER\home\datad\pyproject\CL\test_client.py", line 10, in <module>
    answer0 = result0.get(timeout=5)
              ^^^^^^^^^^^^^^^^^^^^^^
  File "F:\SERVER\home\datad\pyproject\env\Lib\site-packages\celery\result.py", line 251, in get
    return self.backend.wait_for_pending(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "F:\SERVER\home\datad\pyproject\env\Lib\site-packages\celery\backends\asynchronous.py", line 221, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "F:\SERVER\home\datad\pyproject\env\Lib\site-packages\celery\backends\asynchronous.py", line 293, in _wait_for_pending
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.


(env) F:\SERVER\home\datad\pyproject\CL>celery -A tasks worker --loglevel=INFO

 -------------- celery@DESKTOP-9A4E2IQ v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Windows-10-10.0.19045-SP0 2024-05-08 18:23:46
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x2299ab63230
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.hello

[2024-05-08 18:23:46,647: WARNING/MainProcess] F:\SERVER\home\datad\pyproject\env\Lib\site-packages\celery\worker\consumer\consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-05-08 18:23:47,295: INFO/SpawnPoolWorker-1] child process 25240 calling self.run()
[2024-05-08 18:23:47,301: INFO/SpawnPoolWorker-3] child process 12388 calling self.run()
[2024-05-08 18:23:47,316: INFO/SpawnPoolWorker-2] child process 35172 calling self.run()
[2024-05-08 18:23:47,320: INFO/SpawnPoolWorker-4] child process 38828 calling self.run()
[2024-05-08 18:23:48,000: INFO/SpawnPoolWorker-6] child process 37456 calling self.run()
[2024-05-08 18:23:48,009: INFO/SpawnPoolWorker-5] child process 35412 calling self.run()
[2024-05-08 18:23:48,739: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-05-08 18:23:48,740: WARNING/MainProcess] F:\SERVER\home\datad\pyproject\env\Lib\site-packages\celery\worker\consumer\consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-05-08 18:23:50,781: INFO/MainProcess] mingle: searching for neighbors
[2024-05-08 18:23:57,933: INFO/MainProcess] mingle: all alone
[2024-05-08 18:24:08,152: INFO/MainProcess] celery@DESKTOP-9A4E2IQ ready.
[2024-05-08 18:24:49,967: INFO/MainProcess] Task tasks.hello[98676a10-678c-4c43-8a33-924c6c52930d] received
[2024-05-08 18:24:50,564: INFO/SpawnPoolWorker-7] child process 35864 calling self.run()

and I use below code to view task result:

task_id = '98676a10-678c-4c43-8a33-924c6c52930d'  # 替换为你的任务 ID  
result = AsyncResult(task_id)

if result.status == 'SUCCESS':  
    print(f"Task succeeded with result: {result.result}")  
elif result.status == 'FAILURE':  
    print(f"Task failed with exception: {result.traceback}")  
else:  
    print(f"Task status: {result.status}")

the result is:

(env) F:\SERVER\home\datad\pyproject\CL>python test_viewresult.py
Task status: PENDING

@suryan-s
Copy link

suryan-s commented May 8, 2024

@broadcast98 in the celery instance output i could see that the tasks.hello is being received. Along side the output for the test_viewresult is showing pending. could u run the python test_viewresult.py again?

also in the celery instance output i have noticed this:

- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/

the result backend url is not correct or have u edited it out to hide the details?
since you are using redis for the broker and backend, they both should have the same url. For your case it has to be configured as 'redis://localhost:6379/0' where 0 is the redis database.

@suryan-s
Copy link

suryan-s commented May 8, 2024

@broadcast98 also i have modified the test_client.py and test_viewresult.py into this and is working fine:

from tasks import hello
import time
from celery.result import AsyncResult

def check_task_status(task_id):
    try:
        result = AsyncResult(task_id)
        if result.successful():
            print(f"Task {task_id} succeeded with result: {result.result}")
        elif result.failed():
            print(f"Task {task_id} failed with exception: {result.traceback}")
        elif result.status == 'PENDING':
            print(f"Task {task_id} is pending")
        elif result.status == 'STARTED':
            print(f"Task {task_id} has started")
        elif result.status == 'RETRY':
            print(f"Task {task_id} is being retried")
        elif result.status == 'REVOKED':
            print(f"Task {task_id} has been revoked")
        else:
            print(f"Task {task_id} status: {result.status}")
    except Exception as e:
        print(f"Error occurred while checking task {task_id}: {e}")

response = hello.delay()
print('response: ', response)

while True:
    if response.ready():
        break
    print('Task is not ready yet')
    time.sleep(1)


# Check the status of the Celery task
check_task_status(response.id)

This worked fine and i got the details as below:

@suryan-s ➜ /workspaces/codespaces-blank $ python test_client.py
response:  03a8208d-bc3c-4354-ae8b-35f75e747d33
Task 03a8208d-bc3c-4354-ae8b-35f75e747d33 succeeded with result: hello world
details:  hello world

also in you tasks.py, add this too:

app = Celery('tasks', broker=broker_url, backend=result_backend)
app.conf.broker_connection_retry_on_startup = True
app.conf.backend_connection_retry_on_startup = True

This can be useful for ensuring that the Celery worker can establish a connection to the message broker upon startup, especially in scenarios where the message broker might not be immediately available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants