diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 20243ef..b8a16ad 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -20,10 +20,12 @@ func Create(cacheCfg config.CacheCfg) ResponseCache { switch cacheCfg.Type { case "redis": monitoring.Log().Info("Creating redis cache", zap.Strings("addr", cacheCfg.Address)) - return NewRedis(cacheCfg.Address, cacheCfg.ClientConfig) + return NewRedis(cacheCfg.Address, cacheCfg.ClientConfig, CacheCfg{MaxItemSize: cacheCfg.MaxCacheItemSize}) case "redis-cluster": monitoring.Log().Info("Creating redis-cluster cache", zap.Strings("addr", cacheCfg.Address)) - return NewRedisCluster(cacheCfg.Address, cacheCfg.ClientConfig) + return NewRedisCluster(cacheCfg.Address, cacheCfg.ClientConfig, CacheCfg{ + MaxItemSize: cacheCfg.MaxCacheItemSize, + }) default: monitoring.Log().Info("Creating memory cache") return NewMemoryCache(cacheCfg.CacheSize) diff --git a/pkg/cache/redis.go b/pkg/cache/redis.go index 053648b..76f3e11 100644 --- a/pkg/cache/redis.go +++ b/pkg/cache/redis.go @@ -24,13 +24,18 @@ func parseAddress(addrs []string) map[string]string { return mp } +type CacheCfg struct { + MaxItemSize int64 +} + // RedisCache store response in redis type RedisCache struct { client *redisCache.Cache + cfg CacheCfg } // NewRedis create connection to redis and update it config from clientConfig map -func NewRedis(redisAddress []string, clientConfig map[string]string) *RedisCache { +func NewRedis(redisAddress []string, clientConfig map[string]string, cfg CacheCfg) *RedisCache { ring := goRedis.NewRing(&goRedis.RingOptions{ Addrs: parseAddress(redisAddress), }) @@ -45,10 +50,10 @@ func NewRedis(redisAddress []string, clientConfig map[string]string) *RedisCache } } - return &RedisCache{cache} + return &RedisCache{cache, cfg} } -func NewRedisCluster(redisAddress []string, clientConfig map[string]string) *RedisCache { +func NewRedisCluster(redisAddress []string, clientConfig map[string]string, cfg CacheCfg) *RedisCache { ring := goRedis.NewClusterClient(&goRedis.ClusterOptions{ Addrs: redisAddress, }) @@ -63,7 +68,7 @@ func NewRedisCluster(redisAddress []string, clientConfig map[string]string) *Red } } - return &RedisCache{cache} + return &RedisCache{cache, cfg} } func (c *RedisCache) getKey(obj *object.FileObject) string { @@ -72,6 +77,9 @@ func (c *RedisCache) getKey(obj *object.FileObject) string { // Set put response into cache func (c *RedisCache) Set(obj *object.FileObject, res *response.Response) error { + if res.ContentLength > c.cfg.MaxItemSize { + return nil + } monitoring.Report().Inc("cache_ratio;status:set") v, err := msgpack.Marshal(res) if err != nil { diff --git a/pkg/lock/memory.go b/pkg/lock/memory.go index b6ef772..c950c1e 100644 --- a/pkg/lock/memory.go +++ b/pkg/lock/memory.go @@ -75,8 +75,7 @@ func (m *MemoryLock) NotifyAndRelease(_ context.Context, key string, originalRes res, err := originalResponse.Copy() return res, err == nil } else { - res, err := originalResponse.CopyWithStream() - return res, err == nil + return nil, false } }) } diff --git a/pkg/response/response.go b/pkg/response/response.go index 6971ad2..5e25f79 100644 --- a/pkg/response/response.go +++ b/pkg/response/response.go @@ -10,7 +10,6 @@ import ( "github.com/aldor007/mort/pkg/monitoring" "github.com/aldor007/mort/pkg/object" "github.com/aldor007/mort/pkg/transforms" - "github.com/djherbis/stream" "github.com/pquerna/cachecontrol/cacheobject" "github.com/vmihailenco/msgpack" "go.uber.org/zap" @@ -42,8 +41,6 @@ type Response struct { bodyReader io.ReadCloser // original response buffer bodySeeker io.ReadSeeker - resStream *stream.Stream // response stream dispatcher - hasParent bool // flag indicated that response is a copy transformer bodyTransformFnc // function that can transform body writner cachable bool // flag indicating if response can be cached ttl int // time to live in cache @@ -95,6 +92,7 @@ func NewError(statusCode int, err error) *Response { res := Response{StatusCode: statusCode, errorValue: err} res.Headers = make(http.Header) res.Headers.Set(HeaderContentType, "application/json") + res.setBodyBytes([]byte(`{"message": "error"}`)) return &res } @@ -166,13 +164,6 @@ func (r *Response) Close() { r.bodyReader.Close() r.bodyReader = nil } - - if r.resStream != nil && r.hasParent == false { - go func() { - r.resStream.Close() - r.resStream.Remove() - }() - } } // SetDebug set flag indicating that response can including debug information @@ -342,40 +333,6 @@ func (r *Response) Copy() (*Response, error) { } -// CopyWithStream should be used with not buffered response that contain stream. -// It tries to duplicate response stream for multiple readers. -func (r *Response) CopyWithStream() (*Response, error) { - if r == nil { - return nil, errors.New("response is not created") - } - if r.body != nil { - return r.Copy() - } - // todo Add some protection mechanism to disallow CopyWithStream when Send or SendContent method is called. - // todo Also Stream method should be protected somehow, maybe if it possible should be removed in favor of SendContent. - - c := Response{StatusCode: r.StatusCode, ContentLength: r.ContentLength, debug: r.debug, errorValue: r.errorValue} - c.Headers = r.Headers.Clone() - if r.resStream != nil { - c.resStream = r.resStream - c.hasParent = true - return &c, nil - } - - r.bodyReader = r.reader - var err error - r.resStream, err = stream.New("res") - if err != nil { - return nil, err - } - c.resStream = r.resStream - c.hasParent = true - r.reader = ioutil.NopCloser(io.TeeReader(r.bodyReader, r.resStream)) - - return &c, nil - -} - // BytesReaderCloser wraps Bytes type ( @@ -398,11 +355,6 @@ func (c bytesReaderAtSeekerNopCloser) Close() error { // Stream return io.Reader interface from correct response content func (r *Response) Stream() io.ReadCloser { - if r.hasParent == true && r.resStream != nil { - r, _ := r.resStream.NextReader() - return ioutil.NopCloser(r) - } - if r.body != nil { return bytesReaderAtSeekerNopCloser{readerAtSeeker: bytes.NewReader(r.body)} } @@ -441,9 +393,8 @@ func (r *Response) writeDebug() { if err != nil { panic(err) } - r.reader = ioutil.NopCloser(bytes.NewReader(jsonBody)) - r.body = jsonBody - r.ContentLength = int64(len(jsonBody)) + r.Close() + r.setBodyBytes(jsonBody) r.SetContentType("application/json") } } diff --git a/pkg/response/response_test.go b/pkg/response/response_test.go index f9be3a7..d35bd53 100644 --- a/pkg/response/response_test.go +++ b/pkg/response/response_test.go @@ -93,8 +93,8 @@ func TestNewError(t *testing.T) { assert.Equal(t, res.Error(), err) buf, err := res.Body() - assert.NotNil(t, err, "Should return error when reading body") - assert.Nil(t, buf) + assert.Nil(t, err) + assert.Equal(t, string(buf), `{"message": "error"}`) p := &object.FileObject{} res.SetDebug(&object.FileObject{Debug: true, Parent: p, Transforms: transforms.Transforms{NotEmpty: true}}) @@ -282,29 +282,3 @@ func BenchmarkNewCopy(b *testing.B) { } } } - -func BenchmarkNewCopyWithStream(b *testing.B) { - buf := make([]byte, 1024*1024*4) - wBuf := make([]byte, 0, 1024*1024*1) - for i := 0; i < b.N; i++ { - s := ioutil.NopCloser(bytes.NewReader(buf)) - w := bytes.NewBuffer(wBuf) - res := New(200, s) - res.Headers.Set("X-Header", "1") - res.SetContentType("text/html") - resCpy, _ := res.CopyWithStream() - go func() { - io.Copy(w, res.Stream()) - res.Close() - }() - - body, err := ioutil.ReadAll(resCpy.Stream()) - if err != nil { - b.Fatalf("Errors %s", err) - } - - if len(body) != len(buf) { - b.Fatalf("Inavlid body len %d != %d %d", len(body), len(buf), i) - } - } -} diff --git a/tests-int/Cache.Spec.js b/tests-int/Cache.Spec.js index 739325c..7beb733 100644 --- a/tests-int/Cache.Spec.js +++ b/tests-int/Cache.Spec.js @@ -68,7 +68,10 @@ describe('Image processing - cache', function () { return done(err2); } - expect(res.headers['x-mort-cache']).to.eql('hit'); + // If not stream mort should cache response + if (res.headers['content-length']) { + expect(res.headers['x-mort-cache']).to.eql('hit'); + } done(err) }) }) @@ -100,7 +103,10 @@ describe('Image processing - cache', function () { return done(err2); } - expect(res.headers['x-mort-cache']).to.eql('hit'); + // If not stream mort should cache response + if (res.headers['content-length']) { + expect(res.headers['x-mort-cache']).to.eql('hit'); + } done(err) }) })