diff --git a/README.md b/README.md index 6f2c97b..507322e 100644 --- a/README.md +++ b/README.md @@ -87,14 +87,9 @@ from MySQL to PostgreSQL as easy and smooth as possible.

Note: "logs_directory" will be created during script execution.

VERSION

-

Current version is 5.1.0
+

Current version is 5.2.0
(major version . improvements . bug fixes)

-

KNOWN ISSUES

- -

LICENSE

NMIG is available under "GNU GENERAL PUBLIC LICENSE" (v. 3)
http://www.gnu.org/licenses/gpl.txt.

diff --git a/config/config.json b/config/config.json index 7d867bc..8dd6b37 100644 --- a/config/config.json +++ b/config/config.json @@ -51,6 +51,12 @@ ], "loader_max_old_space_size" : "DEFAULT", + "streams_high_water_mark_description": [ + "Buffer level when stream.write() starts returning false.", + "This number is a number of JavaScript objects." + ], + "streams_high_water_mark": 16384, + "encoding_description" : [ "JavaScript encoding type.", "If not supplied, then utf8 will be used as a default." diff --git a/package.json b/package.json index effd3ed..fcee297 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nmig", - "version": "5.1.0", + "version": "5.2.0", "description": "The database migration app", "author": "Anatoly Khaytovich", "license": "GPL-3.0", diff --git a/src/Conversion.ts b/src/Conversion.ts index a4e4f76..ebadf55 100644 --- a/src/Conversion.ts +++ b/src/Conversion.ts @@ -190,6 +190,12 @@ export default class Conversion { */ public _dataTypesMap: any; + /** + * Buffer level when stream.write() starts returning false. + * This number is a number of JavaScript objects. + */ + public readonly _streamsHighWaterMark: number; + /** * Constructor. */ @@ -215,6 +221,8 @@ export default class Conversion { this._dataPool = []; this._dicTables = Object.create(null); this._mySqlDbName = this._sourceConString.database; + this._streamsHighWaterMark = this._config.streams_high_water_mark === undefined ? 16384 : +this._config.streams_high_water_mark; + this._schema = this._config.schema === undefined || this._config.schema === '' ? this._mySqlDbName : this._config.schema; diff --git a/src/DataLoader.ts b/src/DataLoader.ts index b474f53..e2b2c02 100644 --- a/src/DataLoader.ts +++ b/src/DataLoader.ts @@ -136,8 +136,7 @@ async function populateTableWorker( originalSessionReplicationRole ); - const streamsHighWaterMark: number = 16384; // Commonly used as the default, but may vary across different machines. - const json2csvStream = await getJson2csvStream(conv, originalTableName, streamsHighWaterMark, dataPoolId, client, originalSessionReplicationRole); + const json2csvStream = await getJson2csvStream(conv, originalTableName, dataPoolId, client, originalSessionReplicationRole); const mysqlClientErrorHandler = async (err: string) => { await processDataError(conv, err, sql, sqlCopy, tableName, dataPoolId, client, originalSessionReplicationRole); }; @@ -145,7 +144,7 @@ async function populateTableWorker( mysqlClient .query(sql) .on('error', mysqlClientErrorHandler) - .stream({ highWaterMark: streamsHighWaterMark }) + .stream({ highWaterMark: conv._streamsHighWaterMark }) .pipe(json2csvStream) .pipe(copyStream); } @@ -186,7 +185,6 @@ function getCopyStream( async function getJson2csvStream( conversion: Conversion, originalTableName: string, - streamsHighWaterMark: number, dataPoolId: number, client: PoolClient, originalSessionReplicationRole: string | null @@ -208,13 +206,13 @@ async function getJson2csvStream( fields: tableColumnsResult.data.map((column: any) => column.Field) }; - const transformOptions: any = { - highWaterMark: streamsHighWaterMark, + const streamTransformOptions: any = { + highWaterMark: conversion._streamsHighWaterMark, objectMode: true, encoding: conversion._encoding }; - const json2CsvTransformStream = new Json2CsvTransform(options, transformOptions); + const json2CsvTransformStream = new Json2CsvTransform(options, streamTransformOptions); json2CsvTransformStream.on('error', async (transformError: string) => { await processDataError(conversion, transformError, '', '', originalTableName, dataPoolId, client, originalSessionReplicationRole);