diff --git a/CHANGELOG.md b/CHANGELOG.md index 068f444d7..1fb233b01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## v0.1.3 [UNRELEASED] + +### Breaking changes: +- namespace processing no longer expects their the be a "db" portion (i.e. "database.collection") +but an attempt to maintain backwards compatibility is still there for the time being. +[#258](https://github.com/compose/transporter/pull/258) + +### Bugfixes + ## v0.1.2 [2017-01-27] This release is primarily aimed at getting the MongoDB and Elasticsearch adaptors into a diff --git a/pkg/adaptor/elasticsearch/clients/registry.go b/pkg/adaptor/elasticsearch/clients/registry.go index 3583dd3bf..646285986 100644 --- a/pkg/adaptor/elasticsearch/clients/registry.go +++ b/pkg/adaptor/elasticsearch/clients/registry.go @@ -33,4 +33,5 @@ type ClientOptions struct { UserInfo *url.Userinfo HTTPClient *http.Client Path string + Index string } diff --git a/pkg/adaptor/elasticsearch/clients/v1/writer.go b/pkg/adaptor/elasticsearch/clients/v1/writer.go index 245a0a5c9..91b999edd 100644 --- a/pkg/adaptor/elasticsearch/clients/v1/writer.go +++ b/pkg/adaptor/elasticsearch/clients/v1/writer.go @@ -22,6 +22,7 @@ var ( // Writer implements client.Writer and client.Session for sending requests to an elasticsearch // cluster in individual requests. type Writer struct { + index string esClient *elastic.Client logger log.Logger } @@ -45,6 +46,7 @@ func init() { return nil, err } w := &Writer{ + index: opts.Index, esClient: esClient, logger: log.With("path", opts.Path).With("writer", "elasticsearch").With("version", 1), } @@ -56,7 +58,7 @@ func init() { func (w *Writer) Write(msg message.Msg) func(client.Session) error { return func(s client.Session) error { - i, t, _ := message.SplitNamespace(msg) + indexType := msg.Namespace() var id string if _, ok := msg.Data()["_id"]; ok { id = msg.ID() @@ -65,11 +67,11 @@ func (w *Writer) Write(msg message.Msg) func(client.Session) error { var err error switch msg.OP() { case ops.Delete: - _, err = w.esClient.Delete().Index(i).Type(t).Id(id).Do(context.TODO()) + _, err = w.esClient.Delete().Index(w.index).Type(indexType).Id(id).Do(context.TODO()) case ops.Insert: - _, err = w.esClient.Index().Index(i).Type(t).Id(id).BodyJson(msg.Data()).Do(context.TODO()) + _, err = w.esClient.Index().Index(w.index).Type(indexType).Id(id).BodyJson(msg.Data()).Do(context.TODO()) case ops.Update: - _, err = w.esClient.Index().Index(i).Type(t).BodyJson(msg.Data()).Id(id).Do(context.TODO()) + _, err = w.esClient.Index().Index(w.index).Type(indexType).BodyJson(msg.Data()).Id(id).Do(context.TODO()) } return err } diff --git a/pkg/adaptor/elasticsearch/clients/v1/writer_test.go b/pkg/adaptor/elasticsearch/clients/v1/writer_test.go index 11b6d5e6a..3c9902564 100644 --- a/pkg/adaptor/elasticsearch/clients/v1/writer_test.go +++ b/pkg/adaptor/elasticsearch/clients/v1/writer_test.go @@ -18,6 +18,7 @@ import ( const ( defaultURL = "http://127.0.0.1:9200" defaultIndex = "test_v1" + testType = "test" ) var ( @@ -28,10 +29,6 @@ func fullURL(suffix string) string { return fmt.Sprintf("%s/%s%s", testURL, defaultIndex, suffix) } -func testNS() string { - return fmt.Sprintf("%s.%s", defaultIndex, "test") -} - func setup() error { log.Debugln("setting up tests") return clearTestData() @@ -75,13 +72,14 @@ func TestWriter(t *testing.T) { URLs: []string{testURL}, HTTPClient: http.DefaultClient, Path: defaultIndex, + Index: defaultIndex, } vc := clients.Clients["v1"] w, _ := vc.Creator(done, &wg, opts) - w.Write(message.From(ops.Insert, testNS(), map[string]interface{}{"hello": "world"}))(nil) - w.Write(message.From(ops.Insert, testNS(), map[string]interface{}{"_id": "booya", "hello": "world"}))(nil) - w.Write(message.From(ops.Update, testNS(), map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) - w.Write(message.From(ops.Delete, testNS(), map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) + w.Write(message.From(ops.Insert, testType, map[string]interface{}{"hello": "world"}))(nil) + w.Write(message.From(ops.Insert, testType, map[string]interface{}{"_id": "booya", "hello": "world"}))(nil) + w.Write(message.From(ops.Update, testType, map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) + w.Write(message.From(ops.Delete, testType, map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) close(done) wg.Wait() diff --git a/pkg/adaptor/elasticsearch/clients/v2/writer.go b/pkg/adaptor/elasticsearch/clients/v2/writer.go index 0c5e8ce05..4bd3a4cfa 100644 --- a/pkg/adaptor/elasticsearch/clients/v2/writer.go +++ b/pkg/adaptor/elasticsearch/clients/v2/writer.go @@ -23,6 +23,7 @@ var ( // Writer implements client.Writer and client.Session for sending requests to an elasticsearch // cluster via its _bulk API. type Writer struct { + index string bp *elastic.BulkProcessor logger log.Logger } @@ -46,6 +47,7 @@ func init() { return nil, err } w := &Writer{ + index: opts.Index, logger: log.With("writer", "elasticsearch").With("version", 2).With("path", opts.Path), } p, err := esClient.BulkProcessor(). @@ -68,7 +70,7 @@ func init() { func (w *Writer) Write(msg message.Msg) func(client.Session) error { return func(s client.Session) error { - i, t, _ := message.SplitNamespace(msg) + indexType := msg.Namespace() var id string if _, ok := msg.Data()["_id"]; ok { id = msg.ID() @@ -81,11 +83,11 @@ func (w *Writer) Write(msg message.Msg) func(client.Session) error { // we need to flush any pending writes here or this could fail because we're using // more than 1 worker w.bp.Flush() - br = elastic.NewBulkDeleteRequest().Index(i).Type(t).Id(id) + br = elastic.NewBulkDeleteRequest().Index(w.index).Type(indexType).Id(id) case ops.Insert: - br = elastic.NewBulkIndexRequest().Index(i).Type(t).Id(id).Doc(msg.Data()) + br = elastic.NewBulkIndexRequest().Index(w.index).Type(indexType).Id(id).Doc(msg.Data()) case ops.Update: - br = elastic.NewBulkUpdateRequest().Index(i).Type(t).Id(id).Doc(msg.Data()) + br = elastic.NewBulkUpdateRequest().Index(w.index).Type(indexType).Id(id).Doc(msg.Data()) } w.bp.Add(br) return nil diff --git a/pkg/adaptor/elasticsearch/clients/v2/writer_test.go b/pkg/adaptor/elasticsearch/clients/v2/writer_test.go index 6efa28d5c..d99a09fe1 100644 --- a/pkg/adaptor/elasticsearch/clients/v2/writer_test.go +++ b/pkg/adaptor/elasticsearch/clients/v2/writer_test.go @@ -18,6 +18,7 @@ import ( const ( defaultURL = "http://127.0.0.1:9200" defaultIndex = "test_v2" + testType = "test" ) var ( @@ -28,10 +29,6 @@ func fullURL(suffix string) string { return fmt.Sprintf("%s/%s%s", testURL, defaultIndex, suffix) } -func testNS() string { - return fmt.Sprintf("%s.%s", defaultIndex, "test") -} - func setup() error { log.Debugln("setting up tests") return clearTestData() @@ -75,13 +72,14 @@ func TestWriter(t *testing.T) { URLs: []string{testURL}, HTTPClient: http.DefaultClient, Path: defaultIndex, + Index: defaultIndex, } vc := clients.Clients["v2"] w, _ := vc.Creator(done, &wg, opts) - w.Write(message.From(ops.Insert, testNS(), map[string]interface{}{"hello": "world"}))(nil) - w.Write(message.From(ops.Insert, testNS(), map[string]interface{}{"_id": "booya", "hello": "world"}))(nil) - w.Write(message.From(ops.Update, testNS(), map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) - w.Write(message.From(ops.Delete, testNS(), map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) + w.Write(message.From(ops.Insert, testType, map[string]interface{}{"hello": "world"}))(nil) + w.Write(message.From(ops.Insert, testType, map[string]interface{}{"_id": "booya", "hello": "world"}))(nil) + w.Write(message.From(ops.Update, testType, map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) + w.Write(message.From(ops.Delete, testType, map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) close(done) wg.Wait() diff --git a/pkg/adaptor/elasticsearch/clients/v5/writer.go b/pkg/adaptor/elasticsearch/clients/v5/writer.go index 0f96677c3..011ece3cd 100644 --- a/pkg/adaptor/elasticsearch/clients/v5/writer.go +++ b/pkg/adaptor/elasticsearch/clients/v5/writer.go @@ -23,6 +23,7 @@ var ( // Writer implements client.Writer and client.Session for sending requests to an elasticsearch // cluster via its _bulk API. type Writer struct { + index string bp *elastic.BulkProcessor logger log.Logger } @@ -46,6 +47,7 @@ func init() { return nil, err } w := &Writer{ + index: opts.Index, logger: log.With("writer", "elasticsearch").With("version", 5).With("path", opts.Path), } p, err := esClient.BulkProcessor(). @@ -68,7 +70,7 @@ func init() { func (w *Writer) Write(msg message.Msg) func(client.Session) error { return func(s client.Session) error { - i, t, _ := message.SplitNamespace(msg) + indexType := msg.Namespace() var id string if _, ok := msg.Data()["_id"]; ok { id = msg.ID() @@ -81,11 +83,11 @@ func (w *Writer) Write(msg message.Msg) func(client.Session) error { // we need to flush any pending writes here or this could fail because we're using // more than 1 worker w.bp.Flush() - br = elastic.NewBulkDeleteRequest().Index(i).Type(t).Id(id) + br = elastic.NewBulkDeleteRequest().Index(w.index).Type(indexType).Id(id) case ops.Insert: - br = elastic.NewBulkIndexRequest().Index(i).Type(t).Id(id).Doc(msg.Data()) + br = elastic.NewBulkIndexRequest().Index(w.index).Type(indexType).Id(id).Doc(msg.Data()) case ops.Update: - br = elastic.NewBulkUpdateRequest().Index(i).Type(t).Id(id).Doc(msg.Data()) + br = elastic.NewBulkUpdateRequest().Index(w.index).Type(indexType).Id(id).Doc(msg.Data()) } w.bp.Add(br) return nil diff --git a/pkg/adaptor/elasticsearch/clients/v5/writer_test.go b/pkg/adaptor/elasticsearch/clients/v5/writer_test.go index 0b70040ad..26d193626 100644 --- a/pkg/adaptor/elasticsearch/clients/v5/writer_test.go +++ b/pkg/adaptor/elasticsearch/clients/v5/writer_test.go @@ -18,6 +18,7 @@ import ( const ( defaultURL = "http://127.0.0.1:9200" defaultIndex = "test_v5" + testType = "test" ) var ( @@ -28,10 +29,6 @@ func fullURL(suffix string) string { return fmt.Sprintf("%s/%s%s", testURL, defaultIndex, suffix) } -func testNS() string { - return fmt.Sprintf("%s.%s", defaultIndex, "test") -} - func setup() error { log.Debugln("setting up tests") return clearTestData() @@ -75,13 +72,14 @@ func TestWriter(t *testing.T) { URLs: []string{testURL}, HTTPClient: http.DefaultClient, Path: defaultIndex, + Index: defaultIndex, } vc := clients.Clients["v5"] w, _ := vc.Creator(done, &wg, opts) - w.Write(message.From(ops.Insert, testNS(), map[string]interface{}{"hello": "world"}))(nil) - w.Write(message.From(ops.Insert, testNS(), map[string]interface{}{"_id": "booya", "hello": "world"}))(nil) - w.Write(message.From(ops.Update, testNS(), map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) - w.Write(message.From(ops.Delete, testNS(), map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) + w.Write(message.From(ops.Insert, testType, map[string]interface{}{"hello": "world"}))(nil) + w.Write(message.From(ops.Insert, testType, map[string]interface{}{"_id": "booya", "hello": "world"}))(nil) + w.Write(message.From(ops.Update, testType, map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) + w.Write(message.From(ops.Delete, testType, map[string]interface{}{"_id": "booya", "hello": "goodbye"}))(nil) close(done) wg.Wait() diff --git a/pkg/adaptor/elasticsearch/elasticsearch.go b/pkg/adaptor/elasticsearch/elasticsearch.go index 688bc86ce..5b6e7e9d5 100644 --- a/pkg/adaptor/elasticsearch/elasticsearch.go +++ b/pkg/adaptor/elasticsearch/elasticsearch.go @@ -71,15 +71,6 @@ func (e VersionError) Error() string { return fmt.Sprintf("%s running %s, %s", e.uri, e.v, e.err) } -// InvalidTimeoutError wraps the underlying error when the provided is not parsable time.ParseDuration -// type InvalidTimeoutError struct { -// timeout string -// } -// -// func (e InvalidTimeoutError) Error() string { -// return fmt.Sprintf("Invalid Timeout, %s", e.timeout) -// } - // Elasticsearch is an adaptor to connect a pipeline to // an elasticsearch cluster. type Elasticsearch struct { @@ -164,13 +155,12 @@ func (e *Elasticsearch) Stop() error { } func (e *Elasticsearch) applyOp(msg message.Msg) (message.Msg, error) { - _, msgColl, _ := message.SplitNamespace(msg) msgCopy := make(map[string]interface{}) // Copy from the original map to the target map for key, value := range msg.Data() { msgCopy[key] = value } - err := e.client.Write(message.From(msg.OP(), e.computeNamespace(msgColl), msgCopy))(nil) + err := e.client.Write(message.From(msg.OP(), msg.Namespace(), msgCopy))(nil) if err != nil { e.pipe.Err <- adaptor.NewError(adaptor.ERROR, e.path, fmt.Sprintf("write message error (%s)", err), msg.Data) @@ -178,10 +168,6 @@ func (e *Elasticsearch) applyOp(msg message.Msg) (message.Msg, error) { return msg, err } -func (e *Elasticsearch) computeNamespace(Type string) string { - return fmt.Sprintf("%s.%s", e.index, Type) -} - func (e *Elasticsearch) setupClient(conf Config) error { uri, err := url.Parse(conf.URI) if err != nil { @@ -221,6 +207,7 @@ func (e *Elasticsearch) setupClient(conf Config) error { UserInfo: uri.User, HTTPClient: httpClient, Path: e.path, + Index: e.index, } versionedClient, _ := vc.Creator(e.doneChannel, &e.wg, opts) e.client = versionedClient diff --git a/pkg/adaptor/etcd/etcd.go b/pkg/adaptor/etcd/etcd.go index 3f24a90e4..6633cf18c 100644 --- a/pkg/adaptor/etcd/etcd.go +++ b/pkg/adaptor/etcd/etcd.go @@ -6,10 +6,10 @@ import ( "strings" "time" - log "github.com/Sirupsen/logrus" "golang.org/x/net/context" "github.com/compose/transporter/pkg/adaptor" + "github.com/compose/transporter/pkg/log" "github.com/compose/transporter/pkg/message" "github.com/compose/transporter/pkg/message/adaptor/etcd" "github.com/compose/transporter/pkg/message/data" @@ -77,7 +77,7 @@ func init() { if conf.Timeout != "" { t, err := time.ParseDuration(conf.Timeout) if err != nil { - log.Printf("error parsing timeout, defaulting to 10s, %v", err) + log.Errorf("error parsing timeout, defaulting to 10s, %v", err) } else { e.sessionTimeout = t } diff --git a/pkg/adaptor/mongodb/bulk.go b/pkg/adaptor/mongodb/bulk.go index 0b130fc22..1815621c2 100644 --- a/pkg/adaptor/mongodb/bulk.go +++ b/pkg/adaptor/mongodb/bulk.go @@ -25,6 +25,7 @@ var ( // Bulk implements client.Writer for use with MongoDB and takes advantage of the Bulk API for // performance improvements. type Bulk struct { + db string bulkMap map[string]*bulkOperation *sync.RWMutex } @@ -40,8 +41,9 @@ type bulkOperation struct { *sync.Mutex } -func newBulker(done chan struct{}, wg *sync.WaitGroup) *Bulk { +func newBulker(db string, done chan struct{}, wg *sync.WaitGroup) *Bulk { b := &Bulk{ + db: db, bulkMap: make(map[string]*bulkOperation), RWMutex: &sync.RWMutex{}, } @@ -52,7 +54,7 @@ func newBulker(done chan struct{}, wg *sync.WaitGroup) *Bulk { func (b *Bulk) Write(msg message.Msg) func(client.Session) error { return func(s client.Session) error { - db, coll, _ := message.SplitNamespace(msg) + coll := msg.Namespace() b.RLock() bOp, ok := b.bulkMap[coll] b.RUnlock() @@ -60,7 +62,7 @@ func (b *Bulk) Write(msg message.Msg) func(client.Session) error { s := s.(*Session).mgoSession.Copy() bOp = &bulkOperation{ s: s, - bulk: s.DB(db).C(coll).Bulk(), + bulk: s.DB(b.db).C(coll).Bulk(), Mutex: &sync.Mutex{}, } b.Lock() diff --git a/pkg/adaptor/mongodb/bulk_test.go b/pkg/adaptor/mongodb/bulk_test.go index 45e5bc8a2..c774785d4 100644 --- a/pkg/adaptor/mongodb/bulk_test.go +++ b/pkg/adaptor/mongodb/bulk_test.go @@ -42,7 +42,7 @@ func checkBulkCount(c string, countQuery bson.M, expectedCount int, t *testing.T func TestBulkWrite(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) - b := newBulker(done, &wg) + b := newBulker(bulkTestData.DB, done, &wg) ns := fmt.Sprintf("%s.%s", bulkTestData.DB, bulkTestData.C) for _, bt := range bulkTests { @@ -62,7 +62,7 @@ func TestBulkWrite(t *testing.T) { func TestBulkWriteMixedOps(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) - b := newBulker(done, &wg) + b := newBulker(bulkTestData.DB, done, &wg) mixedModeC := "mixed_mode" ns := fmt.Sprintf("%s.%s", bulkTestData.DB, mixedModeC) @@ -89,7 +89,7 @@ func TestBulkWriteMixedOps(t *testing.T) { func TestBulkOpCount(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) - b := newBulker(done, &wg) + b := newBulker(bulkTestData.DB, done, &wg) ns := fmt.Sprintf("%s.%s", bulkTestData.DB, "bar") for i := 0; i < maxObjSize; i++ { @@ -104,7 +104,7 @@ func TestBulkOpCount(t *testing.T) { func TestFlushOnDone(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) - b := newBulker(done, &wg) + b := newBulker(bulkTestData.DB, done, &wg) ns := fmt.Sprintf("%s.%s", bulkTestData.DB, "baz") for i := 0; i < testBulkMsgCount; i++ { @@ -119,7 +119,7 @@ func TestFlushOnDone(t *testing.T) { func TestBulkMulitpleCollections(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) - b := newBulker(done, &wg) + b := newBulker(bulkTestData.DB, done, &wg) ns1 := fmt.Sprintf("%s.%s", bulkTestData.DB, "multi_a") ns2 := fmt.Sprintf("%s.%s", bulkTestData.DB, "multi_b") @@ -142,10 +142,10 @@ func TestBulkMulitpleCollections(t *testing.T) { func TestBulkSize(t *testing.T) { b := &Bulk{ + db: bulkTestData.DB, bulkMap: make(map[string]*bulkOperation), RWMutex: &sync.RWMutex{}, } - ns := fmt.Sprintf("%s.%s", bulkTestData.DB, "size") var bsonSize int for i := 0; i < (maxObjSize - 1); i++ { doc := map[string]interface{}{"i": randStr(2), "rand": randStr(16)} @@ -156,7 +156,7 @@ func TestBulkSize(t *testing.T) { } bsonSize += (len(bs) + 4) - msg := message.From(ops.Insert, ns, doc) + msg := message.From(ops.Insert, "size", doc) b.Write(msg)(defaultSession) } bOp := b.bulkMap["size"] diff --git a/pkg/adaptor/mongodb/mongodb.go b/pkg/adaptor/mongodb/mongodb.go index d632f67d3..18c262adc 100644 --- a/pkg/adaptor/mongodb/mongodb.go +++ b/pkg/adaptor/mongodb/mongodb.go @@ -75,13 +75,13 @@ func init() { pipe: p, path: path, conf: conf, - writer: newWriter(), + writer: newWriter(db), reader: newReader(db), doneChannel: make(chan struct{}), } if conf.Bulk { - m.writer = newBulker(m.doneChannel, &m.wg) + m.writer = newBulker(db, m.doneChannel, &m.wg) } if conf.Tail { @@ -180,8 +180,7 @@ func (m *MongoDB) Stop() error { // writeMessage writes one message to the destination mongo, or sends an error down the pipe func (m *MongoDB) writeMessage(msg message.Msg) (message.Msg, error) { - _, msgColl, _ := message.SplitNamespace(msg) - err := client.Write(m.client, m.writer, message.From(msg.OP(), m.computeNamespace(msgColl), msg.Data())) + err := client.Write(m.client, m.writer, message.From(msg.OP(), msg.Namespace(), msg.Data())) if err != nil { m.pipe.Err <- adaptor.NewError(adaptor.ERROR, m.path, fmt.Sprintf("write message error (%s)", err), msg.Data) @@ -196,10 +195,6 @@ func (m *MongoDB) collectionFilter(collection string) bool { return m.collectionMatch.MatchString(collection) } -func (m *MongoDB) computeNamespace(collection string) string { - return fmt.Sprintf("%s.%s", m.database, collection) -} - // Config provides configuration options for a mongodb adaptor // the notable difference between this and dbConfig is the presence of the Tail option type Config struct { diff --git a/pkg/adaptor/mongodb/writer.go b/pkg/adaptor/mongodb/writer.go index 220f10e14..9daff50a5 100644 --- a/pkg/adaptor/mongodb/writer.go +++ b/pkg/adaptor/mongodb/writer.go @@ -13,11 +13,12 @@ var _ client.Writer = &Writer{} // Writer implements client.Writer for use with MongoDB type Writer struct { + db string writeMap map[ops.Op]func(message.Msg, *mgo.Collection) error } -func newWriter() *Writer { - w := &Writer{} +func newWriter(db string) *Writer { + w := &Writer{db: db} w.writeMap = map[ops.Op]func(message.Msg, *mgo.Collection) error{ ops.Insert: insertMsg, ops.Update: updateMsg, @@ -28,18 +29,17 @@ func newWriter() *Writer { func (w *Writer) Write(msg message.Msg) func(client.Session) error { return func(s client.Session) error { - w, ok := w.writeMap[msg.OP()] + writeFunc, ok := w.writeMap[msg.OP()] if !ok { log.Infof("no function registered for operation, %s\n", msg.OP()) return nil } - return w(msg, msgCollection(msg, s)) + return writeFunc(msg, msgCollection(w.db, msg, s)) } } -func msgCollection(msg message.Msg, s client.Session) *mgo.Collection { - db, coll, _ := message.SplitNamespace(msg) - return s.(*Session).mgoSession.DB(db).C(coll) +func msgCollection(db string, msg message.Msg, s client.Session) *mgo.Collection { + return s.(*Session).mgoSession.DB(db).C(msg.Namespace()) } func insertMsg(msg message.Msg, c *mgo.Collection) error { diff --git a/pkg/adaptor/mongodb/writer_test.go b/pkg/adaptor/mongodb/writer_test.go index a8af5ba5a..c7e8a40fc 100644 --- a/pkg/adaptor/mongodb/writer_test.go +++ b/pkg/adaptor/mongodb/writer_test.go @@ -27,7 +27,7 @@ var optests = []struct { } func TestOpFunc(t *testing.T) { - w := newWriter() + w := newWriter("test") for _, ot := range optests { if _, ok := w.writeMap[ot.op]; ok != ot.registered { t.Errorf("op (%s) registration incorrect, expected %+v, got %+v\n", ot.op.String(), ot.registered, ok) @@ -81,7 +81,7 @@ func TestInsert(t *testing.T) { if testing.Short() { t.Skip("skipping Insert in short mode") } - w := newWriter() + w := newWriter(writerTestData.DB) for _, it := range inserttests { for _, data := range it.data { msg := message.From(ops.Insert, fmt.Sprintf("%s.%s", writerTestData.DB, it.collection), data) @@ -130,7 +130,7 @@ func TestUpdate(t *testing.T) { if testing.Short() { t.Skip("skipping Update in short mode") } - w := newWriter() + w := newWriter(writerTestData.DB) for _, ut := range updatetests { ns := fmt.Sprintf("%s.%s", writerTestData.DB, ut.collection) // Insert data @@ -176,7 +176,7 @@ func TestDelete(t *testing.T) { if testing.Short() { t.Skip("skipping Update in short mode") } - w := newWriter() + w := newWriter(writerTestData.DB) for _, dt := range deletetests { ns := fmt.Sprintf("%s.%s", writerTestData.DB, dt.collection) // Insert data @@ -224,7 +224,7 @@ func TestRestartWrites(t *testing.T) { log.Errorf("failed to drop database (%s), may affect tests!, %s", writerTestData.DB, dropErr) } - w := newWriter() + w := newWriter(writerTestData.DB) done := make(chan struct{}) go func() { for { diff --git a/pkg/message/adaptor/etcd/etcd.go b/pkg/message/adaptor/etcd/etcd.go index 385347f39..cfbb0e01d 100644 --- a/pkg/message/adaptor/etcd/etcd.go +++ b/pkg/message/adaptor/etcd/etcd.go @@ -2,6 +2,7 @@ package etcd import ( "fmt" + "strings" "time" "golang.org/x/net/context" @@ -40,11 +41,8 @@ func (r Adaptor) From(op ops.Op, namespace string, d data.Data) message.Msg { } func genKey(m message.Msg, dataKey string) (string, error) { - key, ns, err := message.SplitNamespace(m) - if err != nil { - return "", err - } - return fmt.Sprintf("/%s/%s/%s", key, ns, dataKey), nil + keyAndNs := strings.Split(m.Namespace(), ".") + return fmt.Sprintf("/%s/%s/%s", keyAndNs[0], keyAndNs[1], dataKey), nil } func (r Adaptor) Insert(m message.Msg) error { diff --git a/pkg/message/adaptor/rethinkdb/rethink.go b/pkg/message/adaptor/rethinkdb/rethink.go index 8bcda3d58..9c9caeb3f 100644 --- a/pkg/message/adaptor/rethinkdb/rethink.go +++ b/pkg/message/adaptor/rethinkdb/rethink.go @@ -39,31 +39,19 @@ func (r Adaptor) From(op ops.Op, namespace string, d data.Data) message.Msg { } func (r Adaptor) Insert(m message.Msg) error { - _, msgTable, err := message.SplitNamespace(m) - if err != nil { - return err - } - resp, err := gorethink.Table(msgTable).Insert(m.Data()).RunWrite(r.conn) + resp, err := gorethink.Table(m.Namespace()).Insert(m.Data()).RunWrite(r.conn) err = handleResponse(&resp) return err } func (r Adaptor) Delete(m message.Msg) error { - _, msgTable, err := message.SplitNamespace(m) - if err != nil { - return err - } - resp, err := gorethink.Table(msgTable).Get(m.ID()).Delete().RunWrite(r.conn) + resp, err := gorethink.Table(m.Namespace()).Get(m.ID()).Delete().RunWrite(r.conn) err = handleResponse(&resp) return err } func (r Adaptor) Update(m message.Msg) error { - _, msgTable, err := message.SplitNamespace(m) - if err != nil { - return err - } - resp, err := gorethink.Table(msgTable).Insert(m.Data(), gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.conn) + resp, err := gorethink.Table(m.Namespace()).Insert(m.Data(), gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.conn) err = handleResponse(&resp) return err } diff --git a/pkg/message/message.go b/pkg/message/message.go index 65cb2d646..d7fef2d43 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -57,6 +57,9 @@ func (m *Base) Timestamp() int64 { // Namespace returns the combination of database/table/colleciton for the underlying adaptor. func (m *Base) Namespace() string { + if strings.Contains(m.NS, ".") { + return strings.Split(m.NS, ".")[1] + } return m.NS } @@ -86,13 +89,3 @@ func (m *Base) ID() string { func MarshalData(m Msg) ([]byte, error) { return json.Marshal(m.Data()) } - -// SplitNamespace splits the nessage namespace into its constituent fields -func SplitNamespace(m Msg) (string, string, error) { - fields := strings.SplitN(m.Namespace(), ".", 2) - - if len(fields) != 2 { - return "", "", fmt.Errorf("malformed msg namespace") - } - return fields[0], fields[1], nil -} diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go index d75b915f3..d13ff297c 100644 --- a/pkg/pipe/pipe.go +++ b/pkg/pipe/pipe.go @@ -67,11 +67,7 @@ func NewPipe(pipe *Pipe, path string) *Pipe { // matchNamespace tests the message's namespace against the provided Regexp func matchNamespace(m message.Msg, nsFilter *regexp.Regexp) (bool, error) { - _, ns, err := message.SplitNamespace(m) - if err != nil { - return false, err - } - return nsFilter.MatchString(ns), nil + return nsFilter.MatchString(m.Namespace()), nil } // Listen starts a listening loop that pulls messages from the In chan, applies fn(msg), a `func(message.Msg) error`, and emits them on the Out channel.