-
Notifications
You must be signed in to change notification settings - Fork 215
add a Noop message type, and the ability to skip messages #95
Conversation
lgtm |
add a Noop message type, and the ability to skip messages
This feature is not working for me the following code drops all the messages module.exports = function(msg) {
if (msg.data['type'] != 'Company') {
msg.op = 'noop';
}
return msg;
} Here are the other two pieces of my configuration: var pipeline = Source({name:"sourcemongo"})
.transform({filename: "transformers/normaldoc.js", namespace: "BedrockDocs.normaldocs"})
.save({name:"destes"}) nodes:
sourcemongo:
type: mongodb
uri: mongodb://USER:PWD@HOST:PORT/BedrockDocs
namespace: BedrockDocs.normaldocs
debug: true
destes:
type: elasticsearch
uri: http://USER:PWD@HOST:PORT
namespace: ops.normaldocs
debug: true Any advice? |
hi @mpnovikova, you'll want to just return false rather than modify the op field like so: module.exports = function(msg) {
if (msg.data['type'] != 'Company') {
msg.op = false;
}
return msg;
} |
sorry... I gave you a bad example, the return value should be false module.exports = function(msg) {
if (msg.data['type'] != 'Company') {
return false;
}
return msg;
} |
That doesn't work either. It was the first thing I tried before I came across this conversation. Same behavior: all messages are being dropped instead of just selected ones. |
can you try the following just so we can confirm the messages are making it to the transform function? module.exports = function(msg) {
console.log("transformer: " + JSON.stringify(msg))
if (msg.data['type'] != 'Company') {
return false;
}
return msg;
} also, I assume you're running version |
Yes I am running |
ok, I haven't been able to reproduce locally, can you provide any of the log lines from the last run with the |
Well right now I am getting this and don't even get into skipping all the records: transporter run --config normaldocs.yaml normaldocs.js
INFO[0000] SYNC Processing db1:27017...
INFO[0000] Establishing new connection to db1:27017 (timeout=5s)...
INFO[0000] failed to parse duration, , falling back to default timeout of 30s
INFO[0000] Connection to db1:27017 established.
INFO[0000] adaptor Starting... path=sourcemongo
INFO[0000] starting Read func db=BedrockDocs
INFO[0000] boot map[sourcemongo:mongodb ccbc3c47-c46f-48f5-6027-13791acdf865:transformer destes:elasticsearch] ts=1486400048134095445
INFO[0000] adaptor Listening... path="sourcemongo/ccbc3c47-c46f-48f5-6027-13791acdf865/destes"
INFO[0000] Establishing new connection to db0:27017 (timeout=1h0m0s)...
INFO[0000] SYNC Adding db1:27017 to cluster as a slave.
INFO[0000] Connection to db0:27017 established.
INFO[0000] SYNC Synchronization was complete (got data from primary).
INFO[0000] SYNC Synchronization completed: 1 master(s) and 1 slave(s) alive.
INFO[0000] collection count db=BedrockDocs num_collections=5
INFO[0000] skipping iteration... collection="customer_doc_counts" db=BedrockDocs
INFO[0000] skipping iteration... collection=docstats db=BedrockDocs
INFO[0000] skipping iteration... collection="ipec_results" db=BedrockDocs
INFO[0000] sending for iteration... collection=normaldocs db=BedrockDocs
INFO[0000] skipping iteration... collection=system.indexes db=BedrockDocs
INFO[0000] done iterating collections db=BedrockDocs
INFO[0000] iterating... collection=normaldocs
INFO[0000] Establishing new connection to db0:27017 (timeout=1h0m0s)...
INFO[0000] Connection to db0:27017 established.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0xbec242]
goroutine 47 [running]:
panic(0xe36080, 0xc42000e070)
/usr/local/Cellar/go/1.7.4/libexec/src/runtime/panic.go:500 +0x1a1
github.com/compose/transporter/vendor/gopkg.in/olivere/elastic%2ev5.(*BulkService).estimateSizeInBytes(0xc420082d10, 0x0, 0x0, 0xc4205f6be0)
/Users/JP/gocode/src/github.com/compose/transporter/vendor/gopkg.in/olivere/elastic.v5/bulk.go:147 +0x22
github.com/compose/transporter/vendor/gopkg.in/olivere/elastic%2ev5.(*BulkService).EstimatedSizeInBytes(0xc420082d10, 0xc420023718)
/Users/JP/gocode/src/github.com/compose/transporter/vendor/gopkg.in/olivere/elastic.v5/bulk.go:137 +0x9d
github.com/compose/transporter/vendor/gopkg.in/olivere/elastic%2ev5.(*bulkWorker).commitRequired(0xc42026db80, 0xc420023708)
/Users/JP/gocode/src/github.com/compose/transporter/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go:539 +0x61
github.com/compose/transporter/vendor/gopkg.in/olivere/elastic%2ev5.(*bulkWorker).work(0xc42026db80)
/Users/JP/gocode/src/github.com/compose/transporter/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go:439 +0x1d6
created by github.com/compose/transporter/vendor/gopkg.in/olivere/elastic%2ev5.(*BulkProcessor).Start
/Users/JP/gocode/src/github.com/compose/transporter/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go:297 +0x335 |
this will let a transformer filter / skip a message if it returns either false, or a message with an op of 'noop'
fixed #93