Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make abstract Bulk API methods better! #1370

Open
msouther-lisc opened this issue Oct 6, 2023 · 2 comments
Open

Make abstract Bulk API methods better! #1370

msouther-lisc opened this issue Oct 6, 2023 · 2 comments
Labels
docs Documentation

Comments

@msouther-lisc
Copy link

msouther-lisc commented Oct 6, 2023

Methods like Bulk.prototype.query and SObject.prototype.upsertBulk are very helpful in that they abstract away a lot of the complexity of creating jobs and batches, dealing with certain events and streaming logic, etc.

Ideally, the developer should never be obligated to think of Salesforce record data in Node.js as anything other than simple arrays of JavaScript objects, and they should be able to use async/await to send and receive those arrays.

But there are some critical gaps to achieving this ideal state:

  • Bulk.prototype.query doesn't return a promise of Promise<Record[]> but a Parsable<Record>. You have to listen for a record event on that object, which takes us away from simple async/await statements.
  • There is no way to capture the job ID when using these abstract methods. If the Node process fails -- or more likely, times out -- the job can't be resumed. (At least, not without some manual intervention, probably involving grabbing the ID from the Bulk Data Load Jobs section of the Setup menu in Salesforce.)
  • Even if there was a way to get the job ID while awaiting these methods, there are no equivalent abstract "resume" methods that return responses that return a Promise<Record[]> or Promise<BulkIngestBatchResult>. You have to deal with polling, events and streaming to get your results back.
  • In the case of DML jobs (insert, update, upsert, delete, hard delete), if you have more than 10,000 records (the maximum for a bulk batch), there is no way to instruct jsforce to intelligently chunk the records into multiple batches under the same job. Again, you have to re-engineer the abstract methods to achieve this.

These abstract methods were built for simplicity, and they have amazing potential. They just need a few extra API features to be more broadly useful:

  • The ability to query and get a Record[] response
  • The ability to pass a callback function that gets called with the jobId (or maybe the entire JobInfo?) as soon as the job is opened. This jobId can be stored in a local file, a database, etc. for later use in case the Node process times out or crashes.
  • Methods similar to other abstract bulk methods that -- given a jobId alone -- resume polling on the job and return data similar to their corresponding methods for starting those jobs.
  • For DML jobs, an option to specify batch size for auto-chunking

Some sample TypeScript code to illustrate usage:

import jsforce, { Record } from "jsforce";
import {
  readJSONSync,
  outputJSONSync,
  pathExistsSync,
  removeSync,
} from "fs-extra";
import "dotenv/config";
const { instanceUrl, accessToken } = process.env;
const LOCAL_FILE_PATH = "./pendingProcess.json";
import { handleResults } from "./resultHandler";

import type { BulkIngestBatchResult } from "jsforce/lib/api/bulk";

type Process = { queryJobId: string | null; updateJobId: string | null };
let pendingProcess: Process = { queryJobId: null, updateJobId: null };

// If we cached a pending process during the last run...
if (pathExistsSync(LOCAL_FILE_PATH)) {
  // Get cached pending process info from a local file
  pendingProcess = readJSONSync(LOCAL_FILE_PATH);
}

(async () => {
  // Create connection
  const conn = new jsforce.Connection({ instanceUrl, accessToken });
  let accounts: Record[] = [];

  // Create a callback to be invoked when the query job gets an ID
  const queryJobOpenCallback = (jobId: string) => {
    pendingProcess.queryJobId = jobId;

    // Update the local file for possible later reference
    outputJSONSync(LOCAL_FILE_PATH, pendingProcess);
  };

  let results: BulkIngestBatchResult;

  // As long as there is no pending update job in the queue
  if (!pendingProcess.updateJobId) {
    // If there are no pending jobs at all, start a new query job
    if (!pendingProcess.queryJobId) {
      // New abstract method that returns a Promise<Record[]>
      accounts = await conn.bulk.queryRecords(
        "SELECT Id, Type FROM Account WHERE Type = NULL",
        { jobOpenCallback: queryJobOpenCallback }
      );
    }

    // If there is a pending query job, resume it using the job ID
    else {
      // New abstract method that returns a Promise<Record[]>
      accounts = await conn.bulk.resumeQuery(pendingProcess.queryJobId);
    }

    // Update the account data
    for (const account of accounts) {
      account.Type = "Other";
    }

    // Create a callback to be invoked when the update job gets an ID
    const updateJobOpenCallback = (jobId: string) => {
      pendingProcess.updateJobId = jobId;

      // Update the local file for possible later reference
      outputJSONSync(LOCAL_FILE_PATH, pendingProcess);
    };

    // New abstract method that returns a Promise<BulkIngestBatchResult>
    results = await conn.bulk.updateRecords("Account", accounts, {
      batchSize: 5000,
      jobOpenCallback: updateJobOpenCallback,
    });
  } else {
    // New abstract method that returns a Promise<BulkIngestBatchResult>
    results = await conn.bulk.resumeUpdate(pendingProcess.updateJobId);
  }

  // Do something with the results
  await handleResults(results);

  // All done! We can remove the file that tracks our pending process. Next
  // time the process runs, it will start afresh.
  removeSync(LOCAL_FILE_PATH);
})();
@cristiand391
Copy link
Member

Hey, we are refactoring the bulk v1 & v2 wrappers for jsforce v3 here:
#1397

Bulk.prototype.query doesn't return a promise of Promise<Record[]> but a Parsable. You have to listen for a record event on that object, which takes us away from simple async/await statements.

the initial bulk v2 implementation added in jsforce v2 does that:

async query(

unfortunately that leads to memory issues when working with big chunks of records (which is expected if you are using bulk APIs):
forcedotcom/cli#1995

we are reverting it back to returning a record stream in v3, the stream API is better suited this kind of wrappers that deal with ton of data.

There is no way to capture the job ID when using these abstract methods. If the Node process fails -- or more likely, times out -- the job can't be resumed. (At least, not without some manual intervention, probably involving grabbing the ID from the Bulk Data Load Jobs section of the Setup menu in Salesforce.)

jsforce v2 added an open event that includes the jobInfo when a creating a bulk v1 job (bulk2 too but missed the jobInfo payload, adding it now to for v3).

This will allow to keep the job ID and manually call bulk.job to get an instance job but users will have to start polling manually and request results (most methods cover this, we'll add more example to docs).

Even if there was a way to get the job ID while awaiting these methods, there are no equivalent abstract "resume" methods that return responses that return a Promise<Record[]> or Promise. You have to deal with polling, events and streaming to get your results back.

This would require jsforce to cache job results in the user machine, IMO not something a library should deal with.

In the case of DML jobs (insert, update, upsert, delete, hard delete), if you have more than 10,000 records (the maximum for a bulk batch), there is no way to instruct jsforce to intelligently chunk the records into multiple batches under the same job. Again, you have to re-engineer the abstract methods to achieve this.

this could work for Bulk v1.

The Bulk V2 API doesn't allow to create batches, even if you try to push different records to the batches endpoint in an open job you get this err:

BULK_API_ERROR: Found multiple contents for job: <7508D0000031zSB>, please 'Close' / 'Abort' / 'Delete' the current Job then create a new Job and make sure you only do 'PUT' once on a given Job.
    at BulkApiV2.getError (/Users/cdominguez/code/gh/jsforce/lib/http-api.js:278:16)

job data can be uploaded once, then the org will create batches as it needs.

@msouther-lisc
Copy link
Author

Thanks @cristiand391 !

If the memory issues are bad enough that jsforce can only really support streaming, that's valid. It suppose it moots most of my recommendations.

Since Bulk API V2 has very different batching behavior, it may not be necessary to add batch size specification since most clients will probably opt to use Bulk API V2 once it's available in jsforce. I have only been using Bulk API V1 since that is the only bulk API jsforce has supported in the past.

I do agree documentation and examples could be improved upon. For example, the scenario in my original post: a bulk query + bulk update requirement. A good example would address how to structure the code so as to avoid memory issues in the event of large data volumes, how to cache job IDs and resume polling if something (e.g. a timeout) interrupts the initial script run, etc.

I'm glad SFDC has taken over jsforce and a v3 is in the works. jsforce has been underinvested and behind the curve for some time now. Now that Node.js is a core technology in the Salesforce dev stack, the need for an excellent JS library for working with Salesforce data has never been greater.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs Documentation
Projects
None yet
Development

No branches or pull requests

2 participants