Hi there again forum!
I am replicating a table from a MySQL server resource into BigQuery using Retool Workflows and Bulk Inserts. For this, I have created a workflow that replicates a single date from the MySQL table, and then another workflow that repetitively calls that workflow over a date range, all sequentially. Sometimes when executing the parent workflow I get the following error on some dates:
I would really appreciate any insight on this and also any insight on my implementation, I have tried as best as I can to stick to the Workflow Best Practices outlined in the docs but each workflow run is taking between 100 and 300 mb of memory, I have the feeling that I can make this more efficient and it shouldn't take this much memory or execution time, but I'm somewhat new to Retool and coding in JS so my expertise is limited.
The following are pictures of both workflows:
This is a the general boilerplate code inside transform block, only difference is I have the field names and specific data types for my use case:
// Get array of objects
const dataArray = sql_import.data;
// Define the data types for each field
const dataTypesDict = {
'field1': 'INT64',
'field2': 'FLOAT64',
'field3': 'STRING',
};
// Iterate through each record
dataArray.forEach(record => {
// Iterate through each field in the record
for (let field in record) {
// Check if the field is null or empty
if (record[field]===null) {
// Replace with an object containing the expected data type and a null value
record[field] = { RT$BQ_TYPE: dataTypesDict[field], value: null };
}
else if (dataTypesDict[field] === 'STRING') {
// Ensure that STRING fields are properly typed as strings
record[field] = { RT$BQ_TYPE: dataTypesDict[field], value: String(record[field]) };
}
}
});
// Return the modified data array
return dataArray;
The rest of the workflow:
const results = [];
const batchSize = 20; // Number of batches processed at a time
const pauseDuration = 15000; // 15 seconds delay between sets of batch processing
console.log(`Inserting records from date ${startTrigger.data.date}`);
// Loop through the batched data in chunks of 25 batches at a time
for (let i = 0; i < create_batches.data.length; i += batchSize) {
// Extract a chunk of 25 batches for parallel processing
const batch = create_batches.data.slice(i, i + batchSize);
console.log(`Processing batch group starting at index ${i + 1}...`);
// Execute all the queries in parallel for the current batch
const batchResults = await Promise.all(
batch.map((value) => {
// Trigger the lambda function with the current `value`
return batch_loop_lambda.trigger({ value });
})
).catch(error => {
console.error(`Error processing batch starting at index ${i + 1}: ${error.message}`);
return [];
});
// Collect the results from this batch
results.push(...batchResults);
// If there are more batches to process, pause for 15 seconds
if (i + batchSize < create_batches.data.length) {
console.log(`Pausing for 15 seconds after processing ${i + batchSize} batches...`);
await delay(pauseDuration); // Delay of 10 seconds (15000 milliseconds)
}
}
console.log("First batches processed.");
console.log(`Pausing for 15 seconds after processing first batches...`);
await delay(pauseDuration); // Delay of 15 seconds (15000 milliseconds)
return results;
Finally, the delay and insert batch functions look like this:
The parent workflow receives a start date and end date, creates a list of all dates within the date range and calls the above workflow sequentially, waiting for it to finish before calling the next date:
Thanks for reading this far!