Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

WIP: improve EDDB download and update times #23

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
62 changes: 45 additions & 17 deletions modules/eddb/commodities.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,25 @@ function Commodities() {
})
};

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// Uncomment for database insertion progress
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/listings.csv', 'csv');
stream
.on('start', response => {
console.log(`EDDB commodity dump started with status code ${response.statusCode}`);
console.time('commodity');
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -142,26 +155,41 @@ function Commodities() {
})
.on('data', async json => {
stream.pause();
try {
await commoditiesModel.findOneAndUpdate(
{
id: json.id
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at)
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is commented out because otherwise, Mongo returns a duplicate key error for every record NOT updated. It still updates any changed field, eg 0 inserted, 1000 matched, 1 modified, 0 upserted

},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await commoditiesModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
commoditiesModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('commodity');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
Expand Down
64 changes: 45 additions & 19 deletions modules/eddb/factions.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,24 @@ function Factions() {
})
};

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/factions.json', 'json');
stream
.on('start', response => {
console.log(`EDDB faction dump started with status code ${response.statusCode}`);
console.time('faction')
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -145,32 +157,46 @@ function Factions() {
})
.on('data', async json => {
stream.pause();
try {
await factionsModel.findOneAndUpdate(
{
id: json.id,
updated_at: { $ne: json.updated_at }
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at)
json.name_lower = utilities.modify.lowerify(json.name)
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await factionsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
factionsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('faction');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
})
}
}

inherits(Factions, eventEmmiter);
66 changes: 48 additions & 18 deletions modules/eddb/populated_systems.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,24 @@ function PopulatedSystems() {
})
};

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/systems_populated.json', 'json');
stream
.on('start', response => {
console.log(`EDDB populated system dump started with status code ${response.statusCode}`);
console.time('populatedSystems')
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -148,27 +160,42 @@ function PopulatedSystems() {
.on('data', async json => {
stream.pause();
json = modify(json);
try {
await populatedSystemsModel.findOneAndUpdate(
{
id: json.id,
updated_at: { $ne: json.updated_at }
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at);
json.name_lower = utilities.modify.lowerify(json.name);
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await populatedSystemsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
populatedSystemsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('populatedSystems');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
Expand All @@ -182,6 +209,9 @@ function PopulatedSystems() {
minor_faction_presences[index].active_states = statify(minor_faction_presence.active_states);
minor_faction_presences[index].pending_states = statify(minor_faction_presence.pending_states);
minor_faction_presences[index].recovering_states = statify(minor_faction_presence.recovering_states);
if (minor_faction_presence.name) {
minor_faction_presences[index].name_lower = utilities.modify.lowerify(minor_faction_presence.name);
}
});
return json;
}
Expand Down
71 changes: 53 additions & 18 deletions modules/eddb/stations.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,24 @@ function Stations() {
})
}

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/stations.json', 'json');
stream
.on('start', response => {
console.log(`EDDB station dump started with status code ${response.statusCode}`);
console.time('stations')
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -148,27 +160,50 @@ function Stations() {
.on('data', async json => {
stream.pause();
json = modify(json);
try {
await stationsModel.findOneAndUpdate(
{
id: json.id,
updated_at: { $ne: json.updated_at }
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at);
if (json.shipyard_updated_at) {
json.shipyard_updated_at = utilities.modify.millisecondify(json.shipyard_updated_at);
}
if (json.outfitting_updated_at) {
json.outfitting_updated_at = utilities.modify.millisecondify(json.outfitting_updated_at);
}
if (json.market_updated_at) {
json.market_updated_at = utilities.modify.millisecondify(json.market_updated_at);
}
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await stationsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
stationsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('stations');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
Expand Down
Loading