diff --git a/adaptor/mongodb/bulk.go b/adaptor/mongodb/bulk.go index 26397766..b7b6cfcc 100644 --- a/adaptor/mongodb/bulk.go +++ b/adaptor/mongodb/bulk.go @@ -13,8 +13,7 @@ import ( ) const ( - maxObjSize int = 1000 - maxBSONObjSize int = 4800000 + maxBSONObjSize int = 16000000 ) var ( @@ -53,7 +52,7 @@ func (b *Bulk) Write(msg message.Msg) func(client.Session) (message.Msg, error) b.confirmChan = msg.Confirms() bOp, ok := b.bulkMap[coll] if !ok { - s := s.(*Session).mgoSession.Copy() + s := s.(*Session).mgoSession.Clone() bOp = &bulkOperation{ s: s, bulk: s.DB("").C(coll).Bulk(), @@ -69,12 +68,15 @@ func (b *Bulk) Write(msg message.Msg) func(client.Session) (message.Msg, error) msgSize := len(bs) + 4 // if the next op is going to put us over, flush and recreate bOp - if bOp.opCounter >= maxObjSize || bOp.bsonOpSize+msgSize >= maxBSONObjSize { + if bOp.opCounter >= s.(*Session).maxWriteBatchSize || bOp.bsonOpSize+msgSize >= maxBSONObjSize { err = b.flush(coll, bOp) + if err != nil { + log.With("collection", coll).Infof("error flushing collection that has reached its size capacity: %s\n", err.Error()) + } if err == nil && b.confirmChan != nil { b.confirmChan <- struct{}{} } - s := s.(*Session).mgoSession.Copy() + s := s.(*Session).mgoSession.Clone() bOp = &bulkOperation{ s: s, bulk: s.DB("").C(coll).Bulk(), @@ -107,7 +109,7 @@ func (b *Bulk) run(done chan struct{}, wg *sync.WaitGroup) { return } case <-done: - log.Debugln("received done channel") + log.Infoln("received done channel") if err := b.flushAll(); err != nil { log.Errorf("flush error, %s", err) } @@ -131,7 +133,7 @@ func (b *Bulk) flushAll() error { } func (b *Bulk) flush(c string, bOp *bulkOperation) error { - log.With("collection", c).With("opCounter", bOp.opCounter).With("bsonOpSize", bOp.bsonOpSize).Debugln("flushing bulk messages") + log.With("collection", c).With("opCounter", bOp.opCounter).With("bsonOpSize", bOp.bsonOpSize).Infoln("flushing bulk messages") _, err := bOp.bulk.Run() if err != nil && !mgo.IsDup(err) { log.With("collection", c).Errorf("flush error, %s\n", err) @@ -144,7 +146,7 @@ func (b *Bulk) flush(c string, bOp *bulkOperation) error { } } bOp.s.Close() - log.With("collection", c).Debugln("flush complete") + log.With("collection", c).Infoln("flush complete") delete(b.bulkMap, c) return nil } diff --git a/adaptor/mongodb/client.go b/adaptor/mongodb/client.go index 13a9fbf6..41af8dce 100644 --- a/adaptor/mongodb/client.go +++ b/adaptor/mongodb/client.go @@ -28,6 +28,9 @@ const ( // DefaultReadPreference when connecting to a mongo replica set. DefaultReadPreference = mgo.Primary + + // DefaultMaxWriteBatchSize when using the bulk interface + DefaultMaxWriteBatchSize = 1000 ) var ( @@ -65,11 +68,12 @@ type ClientOptionFunc func(*Client) error type Client struct { uri string - safety mgo.Safe - tlsConfig *tls.Config - sessionTimeout time.Duration - tail bool - readPreference mgo.Mode + safety mgo.Safe + tlsConfig *tls.Config + sessionTimeout time.Duration + tail bool + readPreference mgo.Mode + maxWriteBatchSize int mgoSession *mgo.Session } @@ -91,12 +95,13 @@ type Client struct { func NewClient(options ...ClientOptionFunc) (*Client, error) { // Set up the client c := &Client{ - uri: DefaultURI, - sessionTimeout: DefaultSessionTimeout, - safety: DefaultSafety, - tlsConfig: nil, - tail: false, - readPreference: DefaultReadPreference, + uri: DefaultURI, + sessionTimeout: DefaultSessionTimeout, + safety: DefaultSafety, + tlsConfig: nil, + tail: false, + readPreference: DefaultReadPreference, + maxWriteBatchSize: DefaultMaxWriteBatchSize, } // Run the options on it @@ -270,11 +275,19 @@ func (c *Client) initConnection() error { // mgo logger _may_ be a bit too noisy but it'll be good to have for diagnosis mgo.SetLogger(log.Base()) mgoSession.EnsureSafe(&c.safety) - mgoSession.SetBatch(1000) + mgoSession.SetBatch(100000) mgoSession.SetPrefetch(0.5) mgoSession.SetSocketTimeout(time.Hour) mgoSession.SetMode(c.readPreference, true) + // Lets set the max batch size + var results bson.M + err = mgoSession.DB("").Run("isMaster", &results) + if err != nil { + return client.ConnectError{Reason: err.Error()} + } + c.maxWriteBatchSize = results["maxWriteBatchSize"].(int) + if c.tail { log.With("uri", c.uri).Infoln("testing oplog access") localColls, err := mgoSession.DB("local").CollectionNames() @@ -303,5 +316,5 @@ func (c *Client) initConnection() error { // Session fulfills the client.Client interface by providing a copy of the main mgoSession func (c *Client) session() client.Session { sess := c.mgoSession.Copy() - return &Session{sess} + return &Session{sess, c.maxWriteBatchSize} } diff --git a/adaptor/mongodb/session.go b/adaptor/mongodb/session.go index 3ea607c9..76c5abe6 100644 --- a/adaptor/mongodb/session.go +++ b/adaptor/mongodb/session.go @@ -8,6 +8,7 @@ import ( // Session serves as a wrapper for the underlying mgo.Session type Session struct { mgoSession *mgo.Session + maxWriteBatchSize int } var _ client.Session = &Session{}