Fast-csv read several files synchronously

Multi tool use


Fast-csv read several files synchronously
I'm trying to read several files synchronously with fast-csv, it should looks like:
read file 1
execute something while reading
read file 2
execute something while reading (it must be execute after first execution's file that's why I need to do this synchronously)
...
Here is my code simplified:
const csv = require('fast-csv');
const PROCEDURES = [
{ "name": "p1", "file": "p1.csv" },
{ "name": "p2", "file": "p2.csv" },
];
const launchProcedure = (name, file) => {
try {
const fs = require("fs");
const stream = fs.createReadStream(file, {
encoding: 'utf8'
});
console.log('launching parsing...');
stream.once('readable', () => {
// ignore first line
let chunk;
while (null !== (chunk = stream.read(1))) {
if (chunk == 'n') {
break;
}
}
// CSV parsing
const csvStream = csv.fromStream(stream, {
renameHeaders: false,
headers: true,
delimiter: ',',
rowDelimiter: 'n',
quoteHeaders: false,
quoteColumns: false
}).on("data", data => {
console.log('procedure execution...');
// I execute a procedure...
}).on("error", error => {
logger.error(error);
}).on("end", data => {
logger.info(data);
});
});
}
catch (e) {
logger.info(e);
}
}
PROCEDURES.forEach(procedure => {
launchProcedure(procedure.name, procedure.file);
});
Output will be:
launching parsing...
launching parsing...
procedure execution...
procedure execution...
Problem is on stream.once but I used this to ignore first line. I tried to promisify my function and use async await...
( I had a similar problem when executing my procedure and I solved it by using csvStream.pause() and csvStream.resume() ).
Any idea ?
3 Answers
3
Using promises seems like a good way to solve this. Be aware though, that when you create a new Promise with new Promise(executor)
, the executor
is executed immediately. So you need to delay it until the previous promise has been executed.
new Promise(executor)
executor
To "promisify" the launchProcedure
function, you need to return a new Promise at the start of the function:
launchProcedure
const launchProcedure = (name, file) => {
return new Promise((resolve, reject) => {
And then you need to call resolve
(for success) and reject
(for failure) whenever the parsing has finished.
resolve
reject
Finally we need to string together the promises:
let promise = launchProcedure(PROCEDURES[0].name, PROCEDURES[0].file);
for (let i = 1; i < PROCEDURES.length; i++) {
promise = promise.then(() => launchProcedure(PROCEDURES[i].name, PROCEDURES[i].file));
}
Note that I use a lambda function within the then
to delay the creation of the Promise. (By the way, there are also nicer ways to string together promises.)
then
The final code looks like this:
const csv = require('fast-csv');
const PROCEDURES = [
{ "name": "p1", "file": "p1.csv" },
{ "name": "p2", "file": "p2.csv" },
];
const launchProcedure = (name, file) => {
return new Promise((resolve, reject) => {
try {
const fs = require("fs");
const stream = fs.createReadStream(file, {
encoding: 'utf8'
});
console.log('launching parsing...');
stream.once('readable', () => {
// ignore first line
let chunk;
while (null !== (chunk = stream.read(1))) {
if (chunk == 'n') {
break;
}
}
// CSV parsing
const csvStream = csv.fromStream(stream, {
renameHeaders: false,
headers: true,
delimiter: ',',
rowDelimiter: 'n',
quoteHeaders: false,
quoteColumns: false
}).on("data", data => {
console.log('procedure execution...');
// I execute a procedure...
}).on("error", error => {
logger.error(error);
reject(error);
}).on("end", data => {
logger.info(data);
resolve();
});
});
}
catch (e) {
logger.info(e);
reject(e);
}
});
}
let promise = launchProcedure(PROCEDURES[0].name, PROCEDURES[0].file);
for (let i = 1; i < PROCEDURES.length; i++) {
promise = promise.then(() => launchProcedure(PROCEDURES[i].name, PROCEDURES[i].file));
}
promise.then(() => { console.log('all files parsed'); });
The problem here is that launchProcedure
has to be async
in order to use await
. Another problem is that using async
/await
together with Array.forEach
is not the best choice (see here).
launchProcedure
async
await
async
await
Array.forEach
You can do this using the "for-of" loop and await inside the loop body:
const csv = require('fast-csv');
const fs = require('fs');
const PROCEDURES = [
{ "name": "p1", "file": "p1.csv" },
{ "name": "p2", "file": "p2.csv" },
];
const launchProcedure = async (name, file) => {
try {
// only require this once in the file (you require each time `launchProcedure` is getting called)
// const fs = require("fs");
const stream = fs.createReadStream(file, {
encoding: 'utf8'
});
console.log('launching parsing...');
// wait for the readable (or error) event
const ready = await new Promise((resolve, reject) => {
stream.on('readable', resolve);
stream.on('error', reject);
})
.then(() => true)
.catch(() => false);
console.log('file is ready: ', ready)
if (!ready) {
throw new Error(`Unable to read file (file-name: "${file}")`);
}
// ignore first line
let chunk;
while (null !== (chunk = stream.read(1))) {
if (chunk == 'n') {
break;
}
}
// CSV parsing
const csvStream = csv.fromStream(stream, {
renameHeaders: false,
headers: true,
delimiter: ',',
rowDelimiter: 'n',
quoteHeaders: false,
quoteColumns: false
}).on("data", data => {
console.log('procedure execution...');
// I execute a procedure...
}).on("error", error => {
logger.error(error);
}).on("end", data => {
logger.info(data);
});
console.log(`Done reading file (file-name: "${file}")`);
}
catch (e) {
logger.info(e);
}
}
// Wrap your iteration over the `PROCEDURES` array into an async function (this makes `await` available inside the function)
// Then use "for-of" here instead of for each to have full async support.
const runProcedures = async (procedures) => {
for (procedure of PROCEDURES) {
await launchProcedure(procedure.name, procedure.file);
}
}
runProcedures(PROCEDURES);
OUTPUT:
launching parsing...
file is ready: true
Done reading file (file-name: "p1.csv")
launching parsing...
file is ready: true
Done reading file (file-name: "p2.csv")
@Curse No, I don't. This is probably because my csv files don't fit your CSV options. I didn't check that, tbh.
– FK82
15 hours ago
Hey I found a solution in the meantime before seiing your answers (sorry for posting later) !
const launchProcedure = (name, file, callback) => {
try {
const fs = require("fs");
const stream = fs.createReadStream(file, {
encoding: 'utf8'
});
console.log('launching parsing...');
stream.once('readable', () => {
// ignore first line
let chunk;
while (null !== (chunk = stream.read(1))) {
if (chunk == 'n') {
break;
}
}
// CSV parsing
const csvStream = csv.fromStream(stream, {
renameHeaders: false,
headers: true,
delimiter: ',',
rowDelimiter: 'n',
quoteHeaders: false,
quoteColumns: false
}).on("data", data => {
console.log('procedure execution...');
// I execute a procedure...
}).on("error", error => {
logger.error(error);
}).on("end", data => {
logger.info(data);
callback();
});
});
}
catch (e) {
logger.info(e);
}
}
async.eachSeries(PROCEDURES, (procedure, callback) => {
launchProcedure(db, procedure.name, procedure.file, callback);
}, error => {
if (error) {
logger.error(error);
}
else {
logger.info("done");
}
});
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
Don't you have "procedure execution..." for each line of the file after "file is ready: true" ?
– Curse
19 hours ago