Skip to content
This repository has been archived by the owner on May 29, 2021. It is now read-only.

Commit

Permalink
Merge pull request #100 from mvertes/master
Browse files Browse the repository at this point in the history
Task serialization is now correct in all cases.
  • Loading branch information
mvertes authored Nov 4, 2016
2 parents e61e83a + 4fc259c commit 799f3e7
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 26 deletions.
15 changes: 8 additions & 7 deletions examples/cartesian.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ var sc = require('skale-engine').context();

var data = [1, 2, 3, 4, 5, 6];
var data2 = [7, 8, 9, 10, 11, 12];
var nPartitions = 2;
var nPartitions = 3;

var a = sc.parallelize(data, nPartitions);
var b = sc.parallelize(data2, nPartitions);

a.cartesian(b)
.collect(function(err, res) {
res.sort();
console.log(res);
console.assert(JSON.stringify(res) === JSON.stringify([
[1, 7], [1, 8], [1, 9], [2, 7], [2, 8], [2, 9],
[3, 7], [3, 8], [3, 9], [1, 10], [1, 11], [1, 12],
[2, 10], [2, 11], [2, 12], [3, 10], [3, 11], [3, 12],
[4, 7], [4, 8], [4, 9], [5, 7], [5, 8], [5, 9],
[6, 7], [6, 8], [6, 9], [4, 10], [4, 11], [4, 12],
[5, 10], [5, 11], [5, 12], [6, 10], [6, 11], [6, 12]
[1, 10], [1, 11], [1, 12], [1, 7], [1, 8], [1, 9],
[2, 10], [2, 11], [2, 12], [2, 7], [2, 8], [2, 9],
[3, 10], [3, 11], [3, 12], [3, 7], [3, 8], [3, 9],
[4, 10], [4, 11], [4, 12], [4, 7], [4, 8], [4, 9],
[5, 10], [5, 11], [5, 12], [5, 7], [5, 8], [5, 9],
[6, 10], [6, 11], [6, 12], [6, 7], [6, 8], [6, 9]
]));
sc.end();
});
2 changes: 1 addition & 1 deletion examples/coGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var sc = require('skale-engine').context();

var data = [['hello', 1], ['world', 2], ['cedric', 3], ['cedric', 4]];
var data2 = [['cedric', 3], ['world', 4], ['test', 5]];
var nPartitions = 1;
var nPartitions = 2;

var a = sc.parallelize(data, nPartitions);
var b = sc.parallelize(data2, nPartitions);
Expand Down
40 changes: 33 additions & 7 deletions lib/context-local.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,49 @@ function Context(args) {
}

function serialize(task) {
var dual = false;
for (var n in task.nodes) {
if (task.nodes[n].dependencies.length > 1) {
dual = true;
break;
}
var pleft;
var pright;
var nodeId;
var p = task.pid;
var node = task.nodes[task.datasetId];
var part = node.shufflePartitions ? node.shufflePartitions[p] : node.partitions[p];
var pindex = {};

// Walk through dataset ancestors to keep partition dependencies
while (part) {
pindex[part.datasetId] = p;
node = task.nodes[part.parentDatasetId];
if (!node) break;
p = part.parentPartitionIndex;
part = node.shufflePartitions ? node.shufflePartitions[p] : node.partitions[p];
}

return str = JSON.stringify(task, function(key, value) {
if (key == 'sc') return undefined;
if (key == 'dependencies') {
var dep = [];
for (var i = 0; i < value.length; i++) dep[i] = value[i].id;
return dep;
}
if ((key === 'partitions' || key === 'files') && !dual) {
if (key === 'pleft') pleft = value;
else if (key === 'pright') pright = value;
else if (key === 'id') nodeId = value;

// For shufflePartitions (not cartesian), return only the ones used by the task.
if (key === 'files' && ! value.path) {
var v = {};
v[pindex[nodeId]] = value[pindex[nodeId]];
return v;
}

// For cartesian shufflePartitions, return only the ones used by the task
if (key === 'shufflePartitions' && value[0] && value[0].files && value[0].files.path) {
var p1 = Math.floor(task.pid / pright);
var p2 = task.pid % pright + pleft;
v = {};
v[task.pid] = value[task.pid];
v[p1] = value[p1];
v[p2] = value[p2];
return v;
}
return (typeof value === 'function' ) ? value.toString() : value;
Expand Down
44 changes: 34 additions & 10 deletions lib/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,49 @@ function SkaleContext(arg) {
}

function serialize(task) {
var dual = false;
for (var n in task.nodes) {
if (task.nodes[n].dependencies.length > 1) {
dual = true;
break;
}
var pleft;
var pright;
var nodeId;
var p = task.pid;
var node = task.nodes[task.datasetId];
var part = node.shufflePartitions ? node.shufflePartitions[p] : node.partitions[p];
var pindex = {};

// Walk through dataset ancestors to keep partition dependencies
while (part) {
pindex[part.datasetId] = p;
node = task.nodes[part.parentDatasetId];
if (!node) break;
p = part.parentPartitionIndex;
part = node.shufflePartitions ? node.shufflePartitions[p] : node.partitions[p];
}

return str = JSON.stringify(task, function(key, value) {
if (key == 'sc') {
return undefined;
}
if (key == 'sc') return undefined;
if (key == 'dependencies') {
var dep = [];
for (var i = 0; i < value.length; i++) dep[i] = value[i].id;
return dep;
}
if ((key === 'partitions' || key === 'files') && !dual) {
if (key === 'pleft') pleft = value;
else if (key === 'pright') pright = value;
else if (key === 'id') nodeId = value;

// For shufflePartitions (not cartesian), return only the ones used by the task.
if (key === 'files' && ! value.path) {
var v = {};
v[pindex[nodeId]] = value[pindex[nodeId]];
return v;
}

// For cartesian shufflePartitions, return only the ones used by the task
if (key === 'shufflePartitions' && value[0] && value[0].files && value[0].files.path) {
var p1 = Math.floor(task.pid / pright);
var p2 = task.pid % pright + pleft;
v = {};
v[task.pid] = value[task.pid];
v[p1] = value[p1];
v[p2] = value[p2];
return v;
}
return (typeof value === 'function' ) ? value.toString() : value;
Expand Down
2 changes: 1 addition & 1 deletion lib/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ AggregateByKey.prototype.iterate = function (task, p, pipeline, done) {
processShuffleFile(files[cnt], processDone);

function processShuffleFile(file, done) {
task.log('processShuffleFile', p, file.path);
//task.log('processShuffleFile', p, file.path);
var lines = new task.lib.Lines();
task.getReadStream(file).pipe(lines);
lines.on('data', function (linev) {
Expand Down

0 comments on commit 799f3e7

Please # to comment.