From 7650646cd51478b91355b5218e46d8f67c7d8090 Mon Sep 17 00:00:00 2001 From: Anatoly Date: Sun, 30 Sep 2018 01:48:27 +0300 Subject: [PATCH] migrating to typescript --- src/DBAccess.ts | 49 ++++++++++++------------- src/DataLoader.ts | 11 +++--- src/DataPipeManager.ts | 11 +++--- src/ErrorGenerator.ts | 42 --------------------- src/FsOps.ts | 10 +++-- src/TableProcessor.ts | 5 +-- test/TestModules/ColumnTypesTest.ts | 2 +- test/TestModules/DataContentTest.ts | 2 +- test/TestModules/SchemaProcessorTest.ts | 2 +- test/TestModules/TestSchemaProcessor.ts | 15 +++++--- 10 files changed, 55 insertions(+), 94 deletions(-) delete mode 100644 src/ErrorGenerator.ts diff --git a/src/DBAccess.ts b/src/DBAccess.ts index 3ecb163..6349355 100644 --- a/src/DBAccess.ts +++ b/src/DBAccess.ts @@ -21,7 +21,7 @@ import * as mysql from 'mysql'; import { MysqlError, Pool as MySQLPool, PoolConnection } from 'mysql'; import { Pool as PgPool, PoolClient, QueryResult } from 'pg'; -import generateError from './ErrorGenerator'; +import { generateError } from './FsOps'; import Conversion from './Conversion'; import generateReport from './ReportGenerator'; import DBVendors from './DBVendors'; @@ -43,14 +43,14 @@ export default class DBAccess { /** * Ensures MySQL connection pool existence. */ - private _getMysqlConnection(): void { + private async _getMysqlConnection(): Promise { if (!this._conversion._mysql) { this._conversion._sourceConString.connectionLimit = this._conversion._maxDbConnectionPoolSize; this._conversion._sourceConString.multipleStatements = true; const pool: MySQLPool = mysql.createPool(this._conversion._sourceConString); if (!pool) { - generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...'); + await generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...'); process.exit(); } @@ -61,21 +61,21 @@ export default class DBAccess { /** * Ensures PostgreSQL connection pool existence. */ - private _getPgConnection(): void { + private async _getPgConnection(): Promise { if (!this._conversion._pg) { this._conversion._targetConString.max = this._conversion._maxDbConnectionPoolSize; const pool: PgPool = new PgPool(this._conversion._targetConString); if (!pool) { - generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...'); + await generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...'); process.exit(); } this._conversion._pg = pool; - this._conversion._pg.on('error', (error: Error) => { + this._conversion._pg.on('error', async (error: Error) => { const message: string = `Cannot connect to PostgreSQL server...\n' ${ error.message }\n${ error.stack }`; - generateError(this._conversion, message); + await generateError(this._conversion, message); generateReport(this._conversion, message); }); } @@ -85,9 +85,8 @@ export default class DBAccess { * Obtains PoolConnection instance. */ public getMysqlClient(): Promise { - this._getMysqlConnection(); - - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { + await this._getMysqlConnection(); (this._conversion._mysql).getConnection((err: MysqlError | null, connection: PoolConnection) => { return err ? reject(err) : resolve(connection); }); @@ -97,8 +96,8 @@ export default class DBAccess { /** * Obtains PoolClient instance. */ - public getPgClient(): Promise { - this._getPgConnection(); + public async getPgClient(): Promise { + await this._getPgConnection(); return (this._conversion._pg).connect(); } @@ -106,20 +105,20 @@ export default class DBAccess { * Runs a query on the first available idle client and returns its result. * Note, the pool does the acquiring and releasing of the client internally. */ - public runPgPoolQuery(sql: string): Promise { - this._getPgConnection(); + public async runPgPoolQuery(sql: string): Promise { + await this._getPgConnection(); return (this._conversion._pg).query(sql); } /** * Releases MySQL or PostgreSQL connection back to appropriate pool. */ - public releaseDbClient(dbClient?: PoolConnection | PoolClient): void { + public async releaseDbClient(dbClient?: PoolConnection | PoolClient): Promise { try { (dbClient).release(); dbClient = undefined; } catch (error) { - generateError(this._conversion, `\t--[DBAccess::releaseDbClient] ${ error }`); + await generateError(this._conversion, `\t--[DBAccess::releaseDbClient] ${ error }`); } } @@ -127,9 +126,9 @@ export default class DBAccess { * Checks if there are no more queries to be sent using current client. * In such case the client should be released. */ - private _releaseDbClientIfNecessary(client: PoolConnection | PoolClient, shouldHoldClient: boolean): void { + private async _releaseDbClientIfNecessary(client: PoolConnection | PoolClient, shouldHoldClient: boolean): Promise { if (!shouldHoldClient) { - this.releaseDbClient(client); + await this.releaseDbClient(client); } } @@ -154,7 +153,7 @@ export default class DBAccess { client = vendor === DBVendors.PG ? await this.getPgClient() : await this.getMysqlClient(); } catch (error) { // An error occurred when tried to obtain a client from one of pools. - generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); + await generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); return processExitOnError ? process.exit() : { client: client, data: undefined, error: error }; } } @@ -180,11 +179,11 @@ export default class DBAccess { sql = (client).format(sql, bindings); } - (client).query(sql, (error: MysqlError | null, data: any) => { - this._releaseDbClientIfNecessary((client), shouldReturnClient); + (client).query(sql, async (error: MysqlError | null, data: any) => { + await this._releaseDbClientIfNecessary((client), shouldReturnClient); if (error) { - generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); + await generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); return processExitOnError ? process.exit() : reject({ client: client, data: undefined, error: error }); } @@ -206,11 +205,11 @@ export default class DBAccess { ): Promise { try { const data: any = Array.isArray(bindings) ? await (client).query(sql, bindings) : await (client).query(sql); - this._releaseDbClientIfNecessary((client), shouldReturnClient); // Sets the client undefined. + await this._releaseDbClientIfNecessary((client), shouldReturnClient); // Sets the client undefined. return { client: client, data: data, error: undefined }; } catch (error) { - this._releaseDbClientIfNecessary((client), shouldReturnClient); // Sets the client undefined. - generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); + await this._releaseDbClientIfNecessary((client), shouldReturnClient); // Sets the client undefined. + await generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); return processExitOnError ? process.exit() : { client: client, data: undefined, error: error }; } } diff --git a/src/DataLoader.ts b/src/DataLoader.ts index e2516ad..f73e3f7 100644 --- a/src/DataLoader.ts +++ b/src/DataLoader.ts @@ -19,8 +19,7 @@ * @author Anatoly Khaytovich */ import * as csvStringify from './CsvStringifyModified'; -import { log } from './FsOps'; -import generateError from './ErrorGenerator'; +import { log, generateError } from './FsOps'; import Conversion from './Conversion'; import DBAccess from './DBAccess'; import DBAccessQueryResult from './DBAccessQueryResult'; @@ -72,7 +71,7 @@ async function deleteChunk(conv: Conversion, dataPoolId: number, client: PoolCli try { await client.query(sql); } catch (error) { - generateError(conv, `\t--[DataLoader::deleteChunk] ${ error }`, sql); + await generateError(conv, `\t--[DataLoader::deleteChunk] ${ error }`, sql); } finally { dbAccess.releaseDbClient(client); } @@ -88,7 +87,7 @@ function buildChunkQuery(tableName: string, selectFieldList: string, offset: num /** * Processes data-loading error. */ -function processDataError( +async function processDataError( conv: Conversion, streamError: string, sql: string, @@ -97,7 +96,7 @@ function processDataError( dataPoolId: number, client: PoolClient ): Promise { - generateError(conv, `\t--[populateTableWorker] ${ streamError }`, sqlCopy); + await generateError(conv, `\t--[populateTableWorker] ${ streamError }`, sqlCopy); const rejectedData: string = `\t--[populateTableWorker] Error loading table data:\n${ sql }\n`; log(conv, rejectedData, path.join(conv._logsDirPath, `${ tableName }.log`)); return deleteChunk(conv, dataPoolId, client); @@ -131,7 +130,7 @@ async function populateTableWorker( csvStringify(result.data, async (csvError: any, csvString: string) => { if (csvError) { - generateError(conv, `\t--[${ logTitle }] ${ csvError }`); + await generateError(conv, `\t--[${ logTitle }] ${ csvError }`); return resolvePopulateTableWorker(); } diff --git a/src/DataPipeManager.ts b/src/DataPipeManager.ts index ceafe7a..4ec443e 100644 --- a/src/DataPipeManager.ts +++ b/src/DataPipeManager.ts @@ -20,9 +20,8 @@ */ import { ChildProcess, fork } from 'child_process'; import * as path from 'path'; -import { log } from './FsOps'; +import { log, generateError } from './FsOps'; import Conversion from './Conversion'; -import generateError from './ErrorGenerator'; import MessageToDataLoader from './MessageToDataLoader'; import processConstraints from './ConstraintsProcessor'; import decodeBinaryData from './BinaryDataDecoder'; @@ -30,11 +29,11 @@ import decodeBinaryData from './BinaryDataDecoder'; /** * Kills a process specified by the pid. */ -function killProcess(pid: number, conversion: Conversion): void { +async function killProcess(pid: number, conversion: Conversion): Promise { try { process.kill(pid); } catch (killError) { - generateError(conversion, `\t--[killProcess] ${ killError }`); + await generateError(conversion, `\t--[killProcess] ${ killError }`); } } @@ -119,7 +118,7 @@ async function pipeData(conversion: Conversion, dataLoaderPath: string, options: const bandwidth: number[] = fillBandwidth(conversion); const chunksToLoad: any[] = bandwidth.map((index: number) => conversion._dataPool[index]); - loaderProcess.on('message', (signal: any) => { + loaderProcess.on('message', async (signal: any) => { if (typeof signal === 'object') { conversion._dicTables[signal.tableName].totalRowsInserted += signal.rowsInserted; const msg: string = `\t--[pipeData] For now inserted: ${ conversion._dicTables[signal.tableName].totalRowsInserted } rows, @@ -129,7 +128,7 @@ async function pipeData(conversion: Conversion, dataLoaderPath: string, options: return; } - killProcess(loaderProcess.pid, conversion); + await killProcess(loaderProcess.pid, conversion); conversion._processedChunks += chunksToLoad.length; return pipeData(conversion, dataLoaderPath, options); }); diff --git a/src/ErrorGenerator.ts b/src/ErrorGenerator.ts deleted file mode 100644 index 28034c2..0000000 --- a/src/ErrorGenerator.ts +++ /dev/null @@ -1,42 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -import * as fs from 'fs'; -import { log } from './FsOps'; -import Conversion from './Conversion'; - -/** - * Writes a ditailed error message to the "/errors-only.log" file - */ -export default (conversion: Conversion, message: string, sql: string = ''): void => { - message += `\n\n\tSQL: ${sql}\n\n`; - const buffer: Buffer = Buffer.from(message, conversion._encoding); - log(conversion, message, undefined, true); - - fs.open(conversion._errorLogsPath, 'a', conversion._0777, (error: Error, fd: number) => { - if (!error) { - fs.write(fd, buffer, 0, buffer.length, null, () => { - fs.close(fd, () => { - // Each async function MUST have a callback (according to Node.js >= 7). - }); - }); - } - }); -} diff --git a/src/FsOps.ts b/src/FsOps.ts index db12d82..2f43eeb 100644 --- a/src/FsOps.ts +++ b/src/FsOps.ts @@ -32,11 +32,13 @@ export function generateError(conversion: Conversion, message: string, sql: stri log(conversion, message, undefined, true); fs.open(conversion._errorLogsPath, 'a', conversion._0777, (error: Error, fd: number) => { - if (!error) { - fs.write(fd, buffer, 0, buffer.length, null, () => { - fs.close(fd, () => resolve()); - }); + if (error) { + return resolve(); } + + fs.write(fd, buffer, 0, buffer.length, null, () => { + fs.close(fd, () => resolve()); + }); }); }); } diff --git a/src/TableProcessor.ts b/src/TableProcessor.ts index e1270fa..00e4fa5 100644 --- a/src/TableProcessor.ts +++ b/src/TableProcessor.ts @@ -18,8 +18,7 @@ * * @author Anatoly Khaytovich */ -import { log } from './FsOps'; -import generateError from './ErrorGenerator'; +import { log, generateError } from './FsOps'; import Conversion from './Conversion'; import DBAccess from './DBAccess'; import DBAccessQueryResult from './DBAccessQueryResult'; @@ -97,7 +96,7 @@ export async function createTable(conversion: Conversion, tableName: string): Pr const result: DBAccessQueryResult = await dbAccess.query(logTitle, sqlAddDataChunkIdColumn, DBVendors.PG, false, false); if (result.error) { - generateError(conversion, `\t--[${ logTitle }] ${ result.error }`, sqlAddDataChunkIdColumn); + await generateError(conversion, `\t--[${ logTitle }] ${ result.error }`, sqlAddDataChunkIdColumn); } return; diff --git a/test/TestModules/ColumnTypesTest.ts b/test/TestModules/ColumnTypesTest.ts index 35477fd..a4288b9 100644 --- a/test/TestModules/ColumnTypesTest.ts +++ b/test/TestModules/ColumnTypesTest.ts @@ -45,7 +45,7 @@ async function getColumnTypes(testSchemaProcessor: TestSchemaProcessor): Promise ); if (result.error) { - testSchemaProcessor.processFatalError(result.error); + await testSchemaProcessor.processFatalError(result.error); } return result.data.rows; diff --git a/test/TestModules/DataContentTest.ts b/test/TestModules/DataContentTest.ts index f862b07..6027051 100644 --- a/test/TestModules/DataContentTest.ts +++ b/test/TestModules/DataContentTest.ts @@ -42,7 +42,7 @@ async function retrieveData(testSchemaProcessor: TestSchemaProcessor): Promise { console.log(error); - generateError(this.conversion, error); + await generateError(this.conversion, error); process.exit(); }