Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standardize bulk 1 and 2 APIs expoing "streams" to bulk2.query #1317

Open
wants to merge 6 commits into
base: 2.0
Choose a base branch
from

Conversation

AllanOricil
Copy link

@AllanOricil AllanOricil commented Mar 23, 2023

BEFORE

soql queries that return huge number of rows could throw FATAL Error ... Javascript heap out of memory because bulk2.query stores all records in memory.

image

AFTER

query results can be retrieved in batches with size equal to the value passed to maxRecords.

obs: there isn't a real stream from Salesforce to jsforce because bulk api v2 does not offer it. These changes are just sending pages to a stream so that the api for v2 is the same as v1.

obs: page results are not stored in memory unless the developer chooses it by concatenation results from every page

import { pipeline } from 'stream/promises';
import { Connection } from 'jsforce';

try {
    const conn = new Connection(CONFIG);
    
    //the default value for maxRecords is 10000
    const queryJob = await conn.bulk2.query(
      `SELECT Id, Name FROM Account`,
    );
    const readStream = queryJob.stream();
    const writeStream = fs.createWriteStream(path.resolve(PATH));


  await pipeline(readStream, writeStream);
}catch(e){
   throw new Error('Something went wrong');
}

or, if developers want to gather all records in memory before doing something with them

import { pipeline } from 'stream/promises';
import { Connection } from 'jsforce';


function getRecords(readStream){
   return new Promise((resolve, reject) => {
       const records = [];
       readStream
        .on('data', (data) => records = records.concat(data))
        .on('error', (error) => reject(e))
        .on('close', () => resolve(records))
   });
}

try {
    const conn = new Connection(CONFIG);
    const records = [];
    const queryJob = await conn.bulk2.query(
      `SELECT Id, Name FROM Account`,
    );
    const readStream = queryJob.stream();
    const records = await getRecords(readStream);
}catch(e){
   throw new Error('Something went wrong');
}

@AllanOricil AllanOricil marked this pull request as draft March 23, 2023 04:14
@AllanOricil AllanOricil marked this pull request as ready for review March 23, 2023 04:15
…now the data is piped to a Readable stream. Change the way maxRecords is converted to string
- create getRecords to remove code repetition
- rename tests with a better description of what they do
… a query param in the request url

- add a counter for the number of batches
- rename the counter that counts the number of records from numberOfRecordsProcessed to numberRecordsRetrieved

while (this.locator !== 'null') {
const nextResults = await this.request<Record[]>({
private async *getResults(): AsyncGenerator<Record[], void, unknown> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather you name this method something different, like getResultsGenerator, and still have a public getResults() method that does:

return new Promise((resolve) => {
    let records = [];
    this.stream()
      .on('data', (data) => {
        records = records.concat(data);
      })
      .on('end', () => {
        resolve(records);
      });
  });

That way everyone who is used to the getResult method can still access it from the query job and doesn't have to write it themselves. Migrating from v1 -v2 would be easier, as it would only need to change bulk.query() to bulkd.query().getResults()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind, I am no longer a maintainer on this project, so I would leave it up to @stomita or @mshanemc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the getRecords() method because bulk v1 does not have it, and because I wanted to make v2's API equal to v1. Another good reason for not creating this method, is because it could cause Fatal error: Javascript heap out of memory if used to query huge data sets without using the LIMIT keyword.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AllanOricil Even though jsforce v2 is still in beta there's lot of projects using it so I would prefer not break the current bulk v2 implementation. What do you think about deprecating this and add an option to the query job, then make that the default at v2 GA?

maybe a third arg to BulkV2.query that makes it return the job instead of the results could work, default to false.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np. I'm going to add this arg 👍

@AllanOricil AllanOricil changed the title add streams to bulk2.query add "streams" to bulk2.query Apr 29, 2023
@AllanOricil AllanOricil reopened this Jul 28, 2023
@AllanOricil AllanOricil changed the title add "streams" to bulk2.query Standardize bulk 1 and 2 APIs expoing "streams" to bulk2.query Nov 13, 2023
@cristiand391
Copy link
Member

Hey @AllanOricil, I opened a PR a few days ago to fix this in v3:
#1397

At first I was about to merge your PR and start from there but after playing a bit whit the record stream system in jsforce I decided to start from scratch (we had some planned breaking changes in bulk2 too).

After it gets merged we'll start using jsforce v3 in the CLI but this issue will not be solved because data query --bulk --json includes the records in json output, so it still needs to collect all in memory. Thanks for the help!

@AllanOricil
Copy link
Author

AllanOricil commented Feb 15, 2024

If u are going to add the whole file in mem, do some additional checks to avoid unnecessary processing:

Something like this:

  1. get file fstats to determine its size
  2. get system max available mem "available" for the runtime
  3. add a threshold using these 2 numbers + window for error
  4. use all to throw an Error before running a job.
If(!canProcess(...)) throw new Error ("sorry you can't use this file because o XYZ. Please do something like bla and then try again")

This way users won't wait for the mem to be all filled to see an error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants