Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For await syntax #127

Open
woubuc opened this issue Sep 28, 2020 · 10 comments
Open

For await syntax #127

woubuc opened this issue Sep 28, 2020 · 10 comments

Comments

@woubuc
Copy link

woubuc commented Sep 28, 2020

Given that the for await syntax is a thing now, I think it would be very useful if the .each() method returned an async iterator instead of using a classical callback.

@theogravity
Copy link
Collaborator

Not sure how it would work - the reason for the existing implementation is because sqlite3 has two separate callbacks - one for when a row is fetched and another for when the entire operation is done, giving back the number of rows returned.

Altering each to return an iterator is tricky as a result, and also breaks the existing API.

I would be open to a new API method (like asyncEach) that returns an iterator if you want to try your hand at it.

@PaulKiddle
Copy link
Contributor

This is difficult because the row callback for each is synchronous, but for await would require an aync callback (even if you don't actually use any async methods in the for body).
To achieve this result you'd have to keep each row in a queue and wait for the row callback to finish before emitting the next result.

Something like this should do it:

function resultQueue(){
  let returnValue;
  const q = [];
  return {
    getNext(){
      if(q.length){
        return q.shift();
      }
      return new Promise(resolve => {
        returnValue = v => {
          returnValue = null;
          resolve(v);
        };
      });
    },

    setNext(v){
      if(returnValue) {
        returnValue(v);
      } else {
        q.push(v);
      }
    }
  }
}

async function* asyncEach(statement) {
  const { getNext, setNext } = resultQueue();
  db.each(statement, setNext).then(() => setNext(null));
  let row;
  // Assuming there are no falsy values passed to `setNext` until the final callback
  // i.e. null return value indicates no more records
  while(row = await getNext()) {
    yield row;
  }
}

(async function(){
  for await(const row of asyncEach('SELECT * FROM table;')) {
    await doSomethingWithRow(row);
  }
}());

@AlttiRi
Copy link

AlttiRi commented Mar 9, 2021

I have also expected non-callback API of this lib.

Maybe I have missed something, but this simple code works for me:

for await (const row of getRows(db, sql)) {
    console.log(JSON.stringify(row));
}

function getRows(db, sql) {
    let resolve;
    let promise = new Promise(res => {
        resolve = res;
    })
    db.each(sql, (err, row) => {
        resolve(row);
        promise = new Promise(res => {
            resolve = res;
        });
    }).then(rowCount => {
        resolve();
    });
    return (async function * generator() {
        while (true) {
            const row = await promise;
            if (row === undefined) {
                break;
            }
            yield row;
        }
    })();
}

@theogravity
Copy link
Collaborator

@AlttiRi Looks interesting. Do you know what the perf / memory impact would be? Biggest concern is possible memory leak due to garbage collection issues.

@AlttiRi
Copy link

AlttiRi commented Mar 10, 2021

const db = await open({
    filename: "history.sqlite", // use Chromium DB
    driver: sqlite3.Database
});

const sql = `SELECT * FROM urls`;
let now, total, loopTime1, loopTime2;

loopTime1 = 0;
for (let i = 0; i < 10; i++) {
    total = 0;
    now = new Date();
    for await (const row of getRows(db, sql)) {
        total += row.url.length; // "useful work"
    }
    const time = new Date() - now;
    loopTime1 += time;
    // console.log(total);
    console.log("time 1", time);
}

loopTime2 = 0;
for (let i = 0; i < 10; i++) {
    total = 0;
    now = new Date();
    await db.each(sql, (err, row) => {
        total += row.url.length;
    });
    const time = new Date() - now;
    loopTime2 += time;
    // console.log(total);
    console.log("time 2", time);
}

console.log(loopTime1); // ~11 secs
console.log(loopTime2); // ~10 secs

Of course, for await would be slower a bit. But it also should be order.
Memory consuming for both cycles are ~ the same.

BTW, PaulKiddle's code does not work at all.

But there is a strange thing, that awaiting inside for await breaks the work. It's really strange.

const sql2 = `SELECT * FROM urls LIMIT 5;`;
for await (const row of getRows(db, sql2)) {
    console.log(row.url); // printed once
    await new Promise(resolve => setTimeout(resolve, 0)); // here
    // await Promise.resolve(); // with it the code works
    console.log(row.url.length); // printed once
}

In fact it just should add pauses (4 ms) between each iteration, but it does not work correctly.
There is only 1 iteration instead of 5 ones.

UPD. Well, possible it's problem of my code. It looks, that it requires to use a queue for the promises storing when consuming (by user's code) is slower that producing (by the library).

@theogravity
Copy link
Collaborator

@AlttiRi yeah, it would definitely need a queue - for each row, add to the queue, then on yield, pop off. Don't think you need to wrap promises around the row results either.

One thing I'm concerned about is memory usage if a queue is kept - there could be a situation where the queue piles up faster than the underlying code that processes the row, but we could mention that in the readme on the downsides of using this approach vs the current one

@AlttiRi
Copy link

AlttiRi commented Mar 30, 2021

Here is the most correct realization (I assume, I did not test it a lot. Just a quick implementation.):

function getRows(db, sql) {
    const queue = new Queue();

    let finished = false;
    let readyResolve;
    let readyPromise = new Promise(res => {
        readyResolve = res;
    });

    db.each(sql, (err, row) => {
        queue.push(row);
        readyResolve();
    }).then(rowCount => {
        finished = true;
        readyResolve();
    });

    return (async function * generator() {
        while (true) {
            await readyPromise;
            const row = queue.shift();
            if (!queue.length && !finished) {
                readyPromise = new Promise(res => {
                    readyResolve = res;
                });
            }
            if (!row) {
                break;
            }
            yield row;
        }
    })();
}

As you can see it uses Queue.

For the case when you consume rows within one task (a task with chained micro-tasks) it's not so important, and it possible to use just Array ([]). (In this case my first variant of getRows works correctly).

for await (const row of getRows(db, sql)) {
    console.log(row);
    await Promise.resolve();
}

But if you handle the rows in different tasks:

for await (const row of getRows(db, sql)) {
    console.log(row);
    await new Promise(resolve => setImmediate(resolve));
}

It requires to store the rows in an array. Pushing and shifting values to the array works significantly slower for "ArrayList" (Array), so it requires to use "LinkedList" (see my Queue).

My implementation:

class Queue {
    length = 0;
    push(value) {
        const newLast = {
            value,
            next: null
        };
        if (!this._last) {
            if (!this._first) {
                this._first = newLast;
            } else {
                this._first.next = newLast;
                this._last = newLast;
            }
        } else {
            this._last.next = newLast;
            this._last = newLast;
        }
        this.length++;
    }
    shift() {
        if (!this.length) {
            return;
        }
        const first = this._first?.value;
        this._first = this._first?.next;
        this.length--;
        if (!this.length) {
            this._last = null;
        }
        return first;
    }
    *[Symbol.iterator]() {
        while (this.length) {
            yield this.shift();
        }
    }
}

The bench code:

const sql = `SELECT * FROM urls`; // 100k+ rows
let loopTime1 = 0, loopTime2 = 0;
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    for await (const row of getRows(db, sql)) {
        total += 1;
        //await new Promise(resolve => setImmediate(resolve));
    }
    const time = new Date() - now;
    loopTime1 += time;
    console.log(total);
    console.log("time 1", time);
}
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    await db.each(sql, (err, row) => {
        total += 1;
    });
    const time = new Date() - now;
    loopTime2 += time;
    console.log(total);
    console.log("time 2", time);
}
console.log(loopTime1);
console.log(loopTime2);
console.log(Math.abs(100 - (loopTime1 / loopTime2 * 100)).toFixed(2), "%");

The performance is ~15 % slower than the callback approach.


Also you can uncomment await new Promise(resolve => setImmediate(resolve));. And compare the performance with Queue and Array. With Queue it slower by 50 %, with Array — by 850 %.

@theogravity
Copy link
Collaborator

Thanks for looking into this - I'll check it out over the weekend

@theogravity
Copy link
Collaborator

Here is the most correct realization (I assume, I did not test it a lot. Just a quick implementation.):

function getRows(db, sql) {
    const queue = new Queue();

    let finished = false;
    let readyResolve;
    let readyPromise = new Promise(res => {
        readyResolve = res;
    });

    db.each(sql, (err, row) => {
        queue.push(row);
        readyResolve();
    }).then(rowCount => {
        finished = true;
        readyResolve();
    });

    return (async function * generator() {
        while (true) {
            await readyPromise;
            const row = queue.shift();
            if (!queue.length && !finished) {
                readyPromise = new Promise(res => {
                    readyResolve = res;
                });
            }
            if (!row) {
                break;
            }
            yield row;
        }
    })();
}

As you can see it uses Queue.

For the case when you consume rows within one task (a task with chained micro-tasks) it's not so important, and it possible to use just Array ([]). (In this case my first variant of getRows works correctly).

for await (const row of getRows(db, sql)) {
    console.log(row);
    await Promise.resolve();
}

But if you handle the rows in different tasks:

for await (const row of getRows(db, sql)) {
    console.log(row);
    await new Promise(resolve => setImmediate(resolve));
}

It requires to store the rows in an array. Pushing and shifting values to the array works significantly slower for "ArrayList" (Array), so it requires to use "LinkedList" (see my Queue).

My implementation:

class Queue {
    length = 0;
    push(value) {
        const newLast = {
            value,
            next: null
        };
        if (!this._last) {
            if (!this._first) {
                this._first = newLast;
            } else {
                this._first.next = newLast;
                this._last = newLast;
            }
        } else {
            this._last.next = newLast;
            this._last = newLast;
        }
        this.length++;
    }
    shift() {
        const first = this._first?.value;
        this._first = this._first?.next;
        this.length--;
        return first;
    }
}

The bench code:

const sql = `SELECT * FROM urls`; // 100k+ rows
let loopTime1 = 0, loopTime2 = 0;
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    for await (const row of getRows(db, sql)) {
        total += 1;
        //await new Promise(resolve => setImmediate(resolve));
    }
    const time = new Date() - now;
    loopTime1 += time;
    console.log(total);
    console.log("time 1", time);
}
for (let i = 0; i < 5; i++) {
    const now = new Date();
    let total = 0;
    await db.each(sql, (err, row) => {
        total += 1;
    });
    const time = new Date() - now;
    loopTime2 += time;
    console.log(total);
    console.log("time 2", time);
}
console.log(loopTime1);
console.log(loopTime2);
console.log(Math.abs(100 - (loopTime1 / loopTime2 * 100)).toFixed(2), "%");

The performance is ~15 % slower than the callback approach.

Also you can uncomment await new Promise(resolve => setImmediate(resolve));. And compare the performance with Queue and Array. With Queue it slower by 50 %, with Array — by 850 %.

@AlttiRi The code looks good. I had to play around with it myself to understand what it's doing as I never write generators. I'm still trying to figure out if the queue should be pluggable, as in the end-user specifies what kind of queue they want to use for this as different queues may have different performance characteristics.

I'll try to bake this in for next weekend as I need to spend time writing tests / testing it out. Thanks!

@brerneto
Copy link

brerneto commented Jul 18, 2022

Why is there not a completed callback? An async queue is easy to do myself but I need to know when to end.

Edit: Sorry, just found the db property

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants