Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On python 3.8 the query for 'between' in a table CANCEL TASK.... #222

Open
dpineiden opened this issue Aug 18, 2020 · 6 comments
Open

On python 3.8 the query for 'between' in a table CANCEL TASK.... #222

dpineiden opened this issue Aug 18, 2020 · 6 comments
Labels
bug Something isn't working not qualified The issue is not checked yet by the owners

Comments

@dpineiden
Copy link
Contributor

dpineiden commented Aug 18, 2020

Describe the bug

In a table with extra INDEXES, I have a DT_GEN and COUNT (date and int), tested.

The between query as shows on documentation:

On (https://gitlab.com/pineiden/datadbs-rethinkdb/-/blob/master/data_rdb/rdb.py) line 316 and so on

                    table_manager = self.r.db(dbname).table(table_name)
                    task_coro = await table_manager.between(
                        lower, upper, **filter_opt).order_by(order_by).run(
                            self.session, **options)
                    result = await task_coro
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<ConnectionInstance._reader() running at /home/david/.virtualenvs/datawork/lib/python3.8/site-packages/rethinkdb-2.4.7-py3.8.egg/rethinkdb/asyncio_net/net_asyncio.py:313> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7d5a713df0>()]>>

To Reproduce

Mount Rethinkdb 2.4.1
Install Python 3.8
Create a virtualenv

To install dependences and then install from repo:

pip install data-rdb
pip uninstall data-rdb

Now clone from:
https://gitlab.com/pineiden/datadbs-rethinkdb

and

python setup.py develop

On folder tests
run

python load_data.py

And

python read_data.py

Those scripts load with dummy data on a table and read that on real time.

Expected behavior

Get the data from the table

Screenshots
https://gitlab.com/-/snippets/2005380

Additional context
Add any other context about the problem here.

Well, the core of the query (only with the Rethinkdb module) and Python 3.6.9:

from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
conn = loop.run_until_complete(rdb.connect(db='test', host=host, port=port))
first = datetime.now() - timedelta(seconds=200)
tz = timezone("America/Santiago")
firsttz = tz.localize(first)
firstrdb = rdb.iso8601(firsttz.isoformat())
post = datetime.now()
posttz = tz.localize(post)
postrdb = rdb.iso8601(posttz.isoformat())
table = "STATION"
filter_opt = {'left_bound': 'open', 'index': "DT_GEN"}
print(f"Desde {firstrdb}, hasta {postrdb}")
query = rdb.db("test").table(table).between(firstrdb, postrdb,
                                            **filter_opt).run(conn)
result = loop.run_until_complete(query)
print(result)
@dpineiden dpineiden added bug Something isn't working not qualified The issue is not checked yet by the owners labels Aug 18, 2020
@dpineiden
Copy link
Contributor Author

dpineiden commented Aug 18, 2020

Now, working for debugging.

This works (using secondary index COUNTER), with py3.6 and py3.8:


from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import coromask, renew, simple_fargs_out
from functools import partial

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"


def date_rdb(td=0):
    first = datetime.utcnow() + timedelta(seconds=td)
    tz = timezone("UTC")
    firsttz = tz.localize(first)
    firstrdb = rdb.iso8601(firsttz.isoformat())
    return firstrdb


async def run(*args, **kwargs):
    connected = args[0]
    firstrdb = args[1]
    val = args[1]
    conn = args[2]
    if not connected:
        conn = await rdb.connect(db='test', host=host, port=port)
        connected = True
    if connected:
        await asyncio.sleep(5)
        val2 = val + 20
        postrdb = date_rdb()
        print(f"Desde {firstrdb}, hasta {postrdb}")
        query = rdb.db("test").table(table).between(val, val2,
                                                    **filter_opt).run(conn)
        result = await query
        print(result)
    return [connected, val2, conn], kwargs


connected = False
now = date_rdb()
conn = None
val = 0
#args = [connected, now, conn]
args = [connected, val, conn]
kwargs: dict = {}

task = loop.create_task(coromask(run, args, kwargs, simple_fargs_out))
task.add_done_callback(partial(renew, task, run, simple_fargs_out))

loop.run_forever()

@dpineiden
Copy link
Contributor Author

Not using 'between', using 'filter'. Works fine on Python 3.8, the query for datetime

"""
Index: COUNTER
Python 3.6
Python 3.8
"""

from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import coromask, renew, simple_fargs_out
from functools import partial

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"


def date_rdb(td=0):
    first = datetime.utcnow() + timedelta(seconds=td)
    tz = timezone("UTC")
    firsttz = tz.localize(first)
    firstrdb = rdb.iso8601(firsttz.isoformat())
    return firstrdb


async def run(*args, **kwargs):
    connected = args[0]
    firstrdb = args[1]
    postrdb = firstrdb
    conn = args[2]
    if not connected:
        conn = await rdb.connect(db='test', host=host, port=port)
        connected = True
    if connected:
        await asyncio.sleep(5)
        val2 = val + 20
        print(f"Desde {firstrdb}, hasta {postrdb}")
        query = rdb.db("test").table(table).filter(
            lambda data: data["DT_GEN"] >= firstrdb).coerce_to(
                "array").order_by("DT_GEN").run(conn)
        result = await query
        if result:
            lista = []
            for q in result:
                print(q)
                lista.append(q)
            postrdb = lista[-1]["DT_GEN"]
            print("Las data", postrdb)
    return [connected, postrdb, conn], kwargs


connected = False
now = date_rdb()
conn = None
val = 0
args = [connected, now, conn]
kwargs: dict = {}

task = loop.create_task(coromask(run, args, kwargs, simple_fargs_out))
task.add_done_callback(partial(renew, task, run, simple_fargs_out))

loop.run_forever()

@dpineiden
Copy link
Contributor Author

Now, I think, comparing. Between is better than filter for real time. Filter read all the table every time.

I opened the diccionary of options, and it's working ok.

"""
Index: COUNTER
Python 3.6
Python 3.8
"""

from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import TaskLoop

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"


def date_rdb(td=0):
    first = datetime.utcnow() + timedelta(seconds=td)
    tz = timezone("UTC")
    firsttz = tz.localize(first)
    firstrdb = rdb.iso8601(firsttz.isoformat())
    return firstrdb


async def run(*args, **kwargs):
    connected = args[0]
    firstrdb = args[1]
    conn = args[2]
    if not connected:
        conn = await rdb.connect(db='test', host=host, port=port)
        connected = True
    if connected:
        await asyncio.sleep(5)
        postrdb = date_rdb()
        print(f"Desde {firstrdb}, hasta {postrdb}")
        query = rdb.db("test").table(table).between(
            firstrdb,
            postrdb,
            index="DT_GEN",
            left_bound="closed",
            right_bound="open").coerce_to("array").order_by("DT_GEN").run(conn)
        result = await query
        if result:
            lista = []
            for q in result:
                print(q)
                lista.append(q)
            postrdb = lista[-1]["DT_GEN"]
            print("Las data", postrdb)
    return [connected, postrdb, conn], kwargs


connected = False
now = date_rdb()
conn = None
args = [connected, now, conn]
kwargs: dict = {}

task = TaskLoop(run, args, kwargs, **{"name": "test_query_dt_gen"})
task.create()
loop.run_forever()

@dpineiden
Copy link
Contributor Author

And then. Beyond using only 1 connection. When I active more, in somewhat one or other is broken. The 'Task cancelled' error.
So, I'm using now the wait_for and await sleep from asyncio to control that. But I think I can't get the full knowledge from this bug-error or whathever

@dpineiden
Copy link
Contributor Author

Ok. Now i'm reading the code.
On the file 'net_asyncio.py' needs some fixes. Reading the documentaion for the streamreader, streamwriter. Need some extra step when write, close the streams.

https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter

For example, to write (do a query)

stream.write(data)
await stream.drain()

Or to close:

stream.close()
await stream.wait_closed()

Working with sockets is very very tricky :)

@bashalex
Copy link

bashalex commented Nov 6, 2020

I am experiencing the same bug, it's pretty annoying.
Hope it gets resolved soon!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working not qualified The issue is not checked yet by the owners
Projects
None yet
Development

No branches or pull requests

2 participants