Skip to content

Commit

Permalink
Merge branch 'release/v5.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyUss committed Oct 11, 2019
2 parents 6332308 + db4ab9f commit b347e22
Show file tree
Hide file tree
Showing 19 changed files with 356 additions and 810 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ from MySQL to PostgreSQL as easy and smooth as possible.</p>
indexes, primary and foreign keys exactly as they were before migration.</li>

<li>Ability to rename tables and columns during migration.</li>
<li>Ability to recover migration process if disaster took place (without restarting from the beginning).</li>
<li>Ability to migrate big databases - in order to eliminate "process out of memory" issues NMIG will split each table's data into several chunks.<br>Each group of chunks will be loaded via separate worker process.</li>

<li> Speed of data transfer - in order to migrate data fast NMIG uses PostgreSQL COPY protocol.</li>
<li>Ability to recover migration process if disaster took place without restarting from the beginning.</li>
<li>Ability to migrate big databases fast - in order to migrate data NMIG utilizes PostgreSQL COPY protocol.</li>
<li>Ease of monitoring - NMIG will provide detailed output about every step, it takes during the execution.</li>
<li>
Ease of configuration - all the parameters required for migration should be put in one single JSON document.
Expand Down Expand Up @@ -89,7 +87,7 @@ from MySQL to PostgreSQL as easy and smooth as possible.</p>
<b>Note:</b> "logs_directory" will be created during script execution.</p>

<h3>VERSION</h3>
<p>Current version is 4.0.2<br />
<p>Current version is 5.0.0<br />
(major version . improvements . bug fixes)</p>

<h3>KNOWN ISSUES</h3>
Expand Down
8 changes: 1 addition & 7 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

"encoding_description" : [
"JavaScript encoding type.",
"If not supplied, then utf-8 will be used as a default."
"If not supplied, then utf8 will be used as a default."
],
"encoding" : "utf8",

Expand All @@ -60,12 +60,6 @@
],
"schema" : "public",

"data_chunk_size_description" : [
"During migration each table's data will be split into chunks not larger than data_chunk_size (in MB).",
"If not supplied, then 10 MB will be used as a default."
],
"data_chunk_size" : 10,

"no_vacuum_description" : [
"PostgreSQL VACUUM reclaims storage occupied by dead tuples.",
"VACUUM is a very time-consuming procedure.",
Expand Down
45 changes: 35 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nmig",
"version": "4.0.2",
"version": "5.0.0",
"description": "The database migration app",
"author": "Anatoly Khaytovich<[email protected]>",
"license": "GPL-3.0",
Expand All @@ -12,17 +12,18 @@
"node": ">=8.0.0"
},
"dependencies": {
"json2csv": "^4.5.3",
"mysql": "^2.17.1",
"pg": "^7.12.1",
"pg-copy-streams": "^2.2.2"
},
"devDependencies": {
"@types/mysql": "^2.15.7",
"@types/node": "^12.7.5",
"@types/pg": "^7.11.1",
"@types/node": "^12.7.12",
"@types/pg": "^7.11.2",
"@types/tape": "^4.2.33",
"tape": "^4.11.0",
"typescript": "^3.6.3"
"typescript": "^3.6.4"
},
"scripts": {
"build": "tsc",
Expand Down
2 changes: 1 addition & 1 deletion src/BinaryDataDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default async function (conversion: Conversion): Promise<Conversion> {

if (result.error) {
// No need to continue if no 'bytea' or 'geometry' columns found.
dbAccess.releaseDbClient(<PoolClient>result.client);
await dbAccess.releaseDbClient(<PoolClient>result.client);
return conversion;
}

Expand Down
68 changes: 0 additions & 68 deletions src/BufferStream.ts

This file was deleted.

103 changes: 27 additions & 76 deletions src/ConsistencyEnforcer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,88 +22,39 @@ import DBAccessQueryResult from './DBAccessQueryResult';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBVendors from './DBVendors';
import * as extraConfigProcessor from './ExtraConfigProcessor';

/**
* Updates consistency state.
*/
async function updateConsistencyState(conversion: Conversion, dataPoolId: number): Promise<void> {
const logTitle: string = 'ConsistencyEnforcer::updateConsistencyState';
const sql: string = `UPDATE "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"
SET is_started = TRUE WHERE id = ${ dataPoolId };`;

const dbAccess: DBAccess = new DBAccess(conversion);
await dbAccess.query(logTitle, sql, DBVendors.PG, false, false);
}

/**
* Retrieves the `is_started` value of current chunk.
*/
async function getIsStarted(conversion: Conversion, dataPoolId: number): Promise<boolean> {
const logTitle: string = 'ConsistencyEnforcer::getIsStarted';
const sql: string = `SELECT is_started AS is_started
FROM "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"
WHERE id = ${ dataPoolId };`;

const dbAccess: DBAccess = new DBAccess(conversion);
const result: DBAccessQueryResult = await dbAccess.query(logTitle, sql, DBVendors.PG, false, false);
return result.error ? false : !!result.data.rows[0].is_started;
}

/**
* Current data chunk runs after a disaster recovery.
* Must determine if current chunk has already been loaded.
* This is in order to prevent possible data duplications.
*/
async function hasCurrentChunkLoaded(conversion: Conversion, chunk: any): Promise<boolean> {
const logTitle: string = 'ConsistencyEnforcer::hasCurrentChunkLoaded';
const originalTableName: string = extraConfigProcessor.getTableName(conversion, chunk._tableName, true);
const sql: string = `SELECT EXISTS(SELECT 1 FROM "${ conversion._schema }"."${ chunk._tableName }"
WHERE "${ conversion._schema }_${ originalTableName }_data_chunk_id_temp" = ${ chunk._id });`;

const dbAccess: DBAccess = new DBAccess(conversion);
const result: DBAccessQueryResult = await dbAccess.query(logTitle, sql, DBVendors.PG, false, false);
return result.error ? true : !!result.data.rows[0].exists;
}

/**
* Determines consistency state.
*/
async function getConsistencyState(conversion: Conversion, chunk: any): Promise<boolean> {
const isStarted: boolean = await getIsStarted(conversion, chunk._id);

// "isStarted" is false in normal migration flow.
return isStarted ? hasCurrentChunkLoaded(conversion, chunk) : false;
}

/**
* Enforces consistency before processing a chunk of data.
* Ensures there are no any data duplications.
* In case of normal execution - it is a good practice.
* In case of rerunning Nmig after unexpected failure - it is absolutely mandatory.
*/
export async function enforceConsistency(conversion: Conversion, chunk: any): Promise<boolean> {
const hasAlreadyBeenLoaded: boolean = await getConsistencyState(conversion, chunk);

if (hasAlreadyBeenLoaded) {
// Current data chunk runs after a disaster recovery.
// It has already been loaded.
return false;
}

// Normal migration flow.
await updateConsistencyState(conversion, chunk._id);
return true;
}

/**
* Drops the {conversion._schema + '_' + originalTableName + '_data_chunk_id_temp'} column from current table.
*/
export async function dropDataChunkIdColumn(conversion: Conversion, tableName: string): Promise<void> {
const logTitle: string = 'ConsistencyEnforcer::dropDataChunkIdColumn';
const originalTableName: string = extraConfigProcessor.getTableName(conversion, tableName, true);
const columnToDrop: string = `${ conversion._schema }_${ originalTableName }_data_chunk_id_temp`;
const sql: string = `ALTER TABLE "${ conversion._schema }"."${ tableName }" DROP COLUMN "${ columnToDrop }";`;
export async function dataTransferred(conversion: Conversion, dataPoolId: number): Promise<boolean> {
const logTitle: string = 'ConsistencyEnforcer::dataTransferred';
const dataPoolTable: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`;
const sqlGetMetadata: string = `SELECT metadata AS metadata FROM ${ dataPoolTable } WHERE id = ${ dataPoolId };`;
const dbAccess: DBAccess = new DBAccess(conversion);
await dbAccess.query(logTitle, sql, DBVendors.PG, false, false);

const result: DBAccessQueryResult = await dbAccess.query(
logTitle,
sqlGetMetadata,
DBVendors.PG,
true,
true
);

const metadata: any = JSON.parse(result.data.rows[0].metadata);
const targetTableName: string = `"${ conversion._schema }"."${ metadata._tableName }"`;
const sqlGetFirstRow: string = `SELECT * FROM ${ targetTableName } LIMIT 1 OFFSET 0;`;

const probe: DBAccessQueryResult = await dbAccess.query(
logTitle,
sqlGetFirstRow,
DBVendors.PG,
true,
false,
result.client
);

return probe.data.rows.length !== 0;
}
3 changes: 0 additions & 3 deletions src/ConstraintsProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import processIndexAndKey from './IndexAndKeyProcessor';
import processComments from './CommentsProcessor';
import processForeignKey from './ForeignKeyProcessor';
import processViews from './ViewGenerator';
import { dropDataChunkIdColumn } from './ConsistencyEnforcer';
import Conversion from './Conversion';

/**
Expand All @@ -42,11 +41,9 @@ export default async function(conversion: Conversion): Promise<void> {
const promises: Promise<void>[] = conversion._tablesToMigrate.map(async (tableName: string) => {
if (!isTableConstraintsLoaded) {
if (conversion.shouldMigrateOnlyDataFor(tableName)) {
await dropDataChunkIdColumn(conversion, tableName);
return sequencesProcessor.setSequenceValue(conversion, tableName);
}

await dropDataChunkIdColumn(conversion, tableName);
await processEnum(conversion, tableName);
await processNull(conversion, tableName);
await processDefault(conversion, tableName);
Expand Down
Loading

0 comments on commit b347e22

Please sign in to comment.