Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign how this plugin handles parsing CSV and writing to the DB #38

Closed
simonw opened this issue Jan 24, 2024 · 11 comments
Closed

Redesign how this plugin handles parsing CSV and writing to the DB #38

simonw opened this issue Jan 24, 2024 · 11 comments
Labels
enhancement New feature or request

Comments

@simonw
Copy link
Owner

simonw commented Jan 24, 2024

The plugin currently works by kicking off a long-running operation in the single "write" thread and parsing the CSV file entirely within that operation:

await db.execute_write_fn(insert_docs_catch_errors, block=False)

I'm having trouble getting the tests to pass against Datasette 1.0 - see #36 - which has made me think that this might not be the best approach.

I'd rather not tie up the write connection for so long - ideally I'd like the parsing to happen in a separate thread with rows written to the database 100 at a time or so.

I'm not entirely sure how to do that, so I'll likely get a good TIL out of it.

@simonw simonw added the enhancement New feature or request label Jan 24, 2024
@simonw
Copy link
Owner Author

simonw commented Jan 24, 2024

Looks like I'll need asyncio.run_coroutine_threadsafe(): https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe

asyncio.run_coroutine_threadsafe(coro, loop)

Unlike other asyncio functions this function requires the loop argument to be passed explicitly.

Also relevant: https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading

This is a bit confusing though. There's also loop.call_soon(callback, *args) and its thread-safe variant loop.call_soon_threadsafe(callback, *args) which both take regular - not async def - Python functions and schedule them to be called on the event loop.

@simonw
Copy link
Owner Author

simonw commented Jan 24, 2024

I can use asyncio.get_running_loop() to get the current loop. Note that this can only be called in an async def function or a callback that has been passed to something like loop.call_soon() - if you call it outside of those you get this error:

>>> import asyncio
>>> asyncio.get_running_loop()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
RuntimeError: no running event loop
>>> 
>>> 
>>> import asyncio
>>> 
>>> def callback():
...     loop = asyncio.get_running_loop()
...     print("Current loop inside callback:", loop)
... 
>>> async def main():
...     loop = asyncio.get_running_loop()
...     loop.call_soon(callback)
... 
>>> asyncio.run(main())
Current loop inside callback: <_UnixSelectorEventLoop running=True closed=False debug=False>

@simonw
Copy link
Owner Author

simonw commented Jan 24, 2024

I'm going to try running the CSV parsing entirely in a dedicated thread. I'll start a new thread for each upload - since this is only available to signed-in users I'm not worried about thousands of concurrent uploads starving threads.

@simonw
Copy link
Owner Author

simonw commented Jan 25, 2024

I got a threaded version working but it feels a bit weird. I uploaded a 175MB CSV file through it and it seemed to work... but once the file had uploaded and the progress bar showed it to be fully processed hitting "refresh" on the table continued to increment the table count.

I think the thread just crammed a huge number of in-memory write operations into the in-memory queue and marked it as complete, then those kept on being processed later.

Here's the diff:

diff --git a/.gitignore b/.gitignore
index bc23806..aac8831 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@ __pycache__/
 *.py[cod]
 *$py.class
 venv
+venv-1
 .eggs
 .pytest_cache
 *.egg-info
diff --git a/datasette_upload_csvs/__init__.py b/datasette_upload_csvs/__init__.py
index b15f26b..5a5d6cd 100644
--- a/datasette_upload_csvs/__init__.py
+++ b/datasette_upload_csvs/__init__.py
@@ -1,3 +1,4 @@
+import asyncio
 from datasette import hookimpl
 from datasette.utils.asgi import Response, Forbidden
 from charset_normalizer import detect
@@ -10,6 +11,7 @@ import io
 import os
 import sqlite_utils
 from sqlite_utils.utils import TypeTracker
+import threading
 import uuid
 
 
@@ -124,57 +126,105 @@ async def upload_csvs(scope, receive, datasette, request):
 
     await db.execute_write_fn(insert_initial_record)
 
-    def insert_docs(database):
-        reader = csv_std.reader(codecs.iterdecode(csv.file, encoding))
-        headers = next(reader)
-
-        tracker = TypeTracker()
+    # We run the CSV parser in a thread, sending 100 rows at a time to the DB
+    def parse_csv_in_thread(event_loop, csv_file, db, table_name, task_id):
+        try:
+            reader = csv_std.reader(codecs.iterdecode(csv_file, encoding))
+            headers = next(reader)
+
+            tracker = TypeTracker()
+
+            docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+
+            i = 0
+
+            def docs_with_progress():
+                nonlocal i
+                for doc in docs:
+                    i += 1
+                    yield doc
+                    if i % 10 == 0:
+
+                        def update_progress(conn):
+                            database = sqlite_utils.Database(conn)
+                            database["_csv_progress_"].update(
+                                task_id,
+                                {
+                                    "rows_done": i,
+                                    "bytes_done": csv_file.tell(),
+                                },
+                            )
+
+                        asyncio.run_coroutine_threadsafe(
+                            db.execute_write_fn(update_progress), event_loop
+                        )
+
+            def write_batch(batch):
+                def insert_batch(conn):
+                    database = sqlite_utils.Database(conn)
+                    database[table_name].insert_all(batch, alter=True)
+
+                asyncio.run_coroutine_threadsafe(
+                    db.execute_write_fn(insert_batch), event_loop
+                )
 
-        docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+            batch = []
+            batch_size = 0
+            for doc in docs_with_progress():
+                batch.append(doc)
+                batch_size += 1
+                if batch_size > 100:
+                    write_batch(batch)
+                    batch = []
+                    batch_size = 0
+
+            if batch:
+                write_batch(batch)
+
+            # Mark progress as complete
+            def mark_complete(conn):
+                nonlocal i
+                database = sqlite_utils.Database(conn)
+                database["_csv_progress_"].update(
+                    task_id,
+                    {
+                        "rows_done": i,
+                        "bytes_done": total_size,
+                        "completed": str(datetime.datetime.utcnow()),
+                    },
+                )
 
-        i = 0
+            asyncio.run_coroutine_threadsafe(
+                db.execute_write_fn(mark_complete), event_loop
+            )
 
-        def docs_with_progress():
-            nonlocal i
-            for doc in docs:
-                i += 1
-                yield doc
-                if i % 10 == 0:
-                    database["_csv_progress_"].update(
-                        task_id,
-                        {
-                            "rows_done": i,
-                            "bytes_done": csv.file.tell(),
-                        },
-                    )
+            # Transform columns to detected types
+            def transform_columns(conn):
+                database = sqlite_utils.Database(conn)
+                database[table_name].transform(types=tracker.types)
 
-        database[table_name].insert_all(
-            docs_with_progress(), alter=True, batch_size=100
-        )
-        database["_csv_progress_"].update(
-            task_id,
-            {
-                "rows_done": i,
-                "bytes_done": total_size,
-                "completed": str(datetime.datetime.utcnow()),
-            },
-        )
-        # Transform columns to detected types
-        database[table_name].transform(types=tracker.types)
-        return database[table_name].count
+            asyncio.run_coroutine_threadsafe(
+                db.execute_write_fn(transform_columns), event_loop
+            )
+        except Exception as error:
 
-    def insert_docs_catch_errors(conn):
-        database = sqlite_utils.Database(conn)
-        with conn:
-            try:
-                insert_docs(database)
-            except Exception as error:
+            def insert_error(conn):
+                database = sqlite_utils.Database(conn)
                 database["_csv_progress_"].update(
                     task_id,
                     {"error": str(error)},
                 )
 
-    await db.execute_write_fn(insert_docs_catch_errors, block=False)
+            asyncio.run_coroutine_threadsafe(
+                db.execute_write_fn(insert_error), event_loop
+            )
+
+    loop = asyncio.get_running_loop()
+
+    # Start that thread running in the default executor in the background
+    loop.run_in_executor(
+        None, parse_csv_in_thread, loop, csv.file, db, table_name, task_id
+    )
 
     if formdata.get("xhr"):
         return Response.json(

@simonw
Copy link
Owner Author

simonw commented Jan 25, 2024

Also running the tests pass but I get a huge number of lines like this:

task: <Task pending name='Task-193' coro=<Database.execute_write_fn() done, defined at /Users/simon/.local/share/virtualenvs/datasette-upload-csvs-Tx5Sw3VW/lib/python3.10/site-packages/datasette/database.py:162> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Users/simon/.pyenv/versions/3.10.4/lib/python3.10/asyncio/futures.py:391]>
Task was destroyed but it is pending!
task: <Task pending name='Task-194' coro=<Database.execute_write_fn() done, defined at /Users/simon/.local/share/virtualenvs/datasette-upload-csvs-Tx5Sw3VW/lib/python3.10/site-packages/datasette/database.py:162> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Users/simon/.pyenv/versions/3.10.4/lib/python3.10/asyncio/futures.py:391]>

Resulting file was pretty big prior to a vacuum too, but that may be from previous unrelated experiments:

$ ls -lah temp.db
-rw-r--r--@ 1 simon  staff   458M Jan 24 16:07 temp.db
$ sqlite-utils vacuum temp.db
$ ls -lah temp.db            
-rw-r--r--@ 1 simon  staff   280M Jan 24 16:10 temp.db

@simonw
Copy link
Owner Author

simonw commented Jan 25, 2024

I think this may be a classic queue problem. Turns out parsing the CSV file as fast as possible produces writes at a rate that's too high for the database to keep up. What we actually want to do is send those writes at a pace that's slightly slower than what the DB can handle, to keep DB capacity open for other stuff that might be going on.

I may need to use https://pypi.org/project/janus/ - which is already used elsewhere in Datasette.

Actually it turns out it's only used in one place in Datasette: to set up a tiny queue on which to send a message back when you perform a blocking write operation: https://github.com/simonw/datasette/blob/7a5adb592ae6674a2058639c66e85eb1b49448fb/datasette/database.py#L201-L210

Note that setting it up with janus.Queue(maxsize=100) should cause writes to the queue to block until there is some space in it once that maximum size has been hit.

@simonw simonw pinned this issue Jan 25, 2024
@simonw
Copy link
Owner Author

simonw commented Jan 25, 2024

Thinking more about this.

The goal here is to have batches of rows from the CSV file written to the database as quickly as possible while still keeping the server able to do other things.

I think the best way to do this is to send batches of 100 rows at a time to the db.execute_write_fn() function - maybe even with a tiny artificial sleep between to give other writes more of a chance to grab a turn?

The file has already been uploaded and saved to the temporary directory before the import kicks off. It would be nice if parsing could start happening while the file was still uploading but I'm OK ignoring that for the moment.

Is CSV parsing an I/O or a CPU bound activity? I'm going to guess it's I/O bound, which means that running it in a regular thread should still provide a performance benefit.

So I think I want a regular thread to run the CSV parsing, which sends 100 rows at a time to db.execute_write_fn().

The biggest question though is whether those calls should be blocking, such that the file reading operation pauses until the batch has been written to the database.

I didn't make them blocking in my first version of this, which resulted in the weird behaviour where the writes continued long after the progress bar had finished.

I'm going to try making them blocking writes instead. That means I need to figure out how to perform a blocking write from a thread, even though that thread isn't running as async def code that can await the call.

@simonw
Copy link
Owner Author

simonw commented Jan 25, 2024

I think I can use this pattern for that:

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

So maybe I just need to call future.result() without a timeout to wait for that thing to finish writing to the DB?

@simonw
Copy link
Owner Author

simonw commented Jan 25, 2024

Adding that future.result() call really did do the job!

progress

The server stayed fully responsive during the upload.

@simonw
Copy link
Owner Author

simonw commented Jan 25, 2024

Weird, still that same test failure where types are not converted on some Python versions for Datasette 1.0.

simonw added a commit that referenced this issue Jan 30, 2024
* Test against Datasette pre and post 1.0a7
* Show Datasette version in pytest header
* Run everything in a transaction
* Run CSV parsing in a separate async task, refs #38
* fail-fast: false
* timeout-minutes: 1 on pytest steps
@simonw
Copy link
Owner Author

simonw commented Jan 30, 2024

I ended up running the CSV parsing in an asyncio task rather than a thread, yielding every 100 records.

@simonw simonw closed this as completed Jan 30, 2024
simonw added a commit that referenced this issue Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant