Skip to content

Commit

Permalink
fix: memory lock panic #49
Browse files Browse the repository at this point in the history
  • Loading branch information
aldor007 committed Oct 24, 2022
1 parent ad078e6 commit e4bba26
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 90 deletions.
6 changes: 4 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ func Create(cacheCfg config.CacheCfg) ResponseCache {
switch cacheCfg.Type {
case "redis":
monitoring.Log().Info("Creating redis cache", zap.Strings("addr", cacheCfg.Address))
return NewRedis(cacheCfg.Address, cacheCfg.ClientConfig)
return NewRedis(cacheCfg.Address, cacheCfg.ClientConfig, CacheCfg{MaxItemSize: cacheCfg.MaxCacheItemSize})
case "redis-cluster":
monitoring.Log().Info("Creating redis-cluster cache", zap.Strings("addr", cacheCfg.Address))
return NewRedisCluster(cacheCfg.Address, cacheCfg.ClientConfig)
return NewRedisCluster(cacheCfg.Address, cacheCfg.ClientConfig, CacheCfg{
MaxItemSize: cacheCfg.MaxCacheItemSize,
})
default:
monitoring.Log().Info("Creating memory cache")
return NewMemoryCache(cacheCfg.CacheSize)
Expand Down
16 changes: 12 additions & 4 deletions pkg/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ func parseAddress(addrs []string) map[string]string {
return mp
}

type CacheCfg struct {
MaxItemSize int64
}

// RedisCache store response in redis
type RedisCache struct {
client *redisCache.Cache
cfg CacheCfg
}

// NewRedis create connection to redis and update it config from clientConfig map
func NewRedis(redisAddress []string, clientConfig map[string]string) *RedisCache {
func NewRedis(redisAddress []string, clientConfig map[string]string, cfg CacheCfg) *RedisCache {
ring := goRedis.NewRing(&goRedis.RingOptions{
Addrs: parseAddress(redisAddress),
})
Expand All @@ -45,10 +50,10 @@ func NewRedis(redisAddress []string, clientConfig map[string]string) *RedisCache
}
}

return &RedisCache{cache}
return &RedisCache{cache, cfg}
}

func NewRedisCluster(redisAddress []string, clientConfig map[string]string) *RedisCache {
func NewRedisCluster(redisAddress []string, clientConfig map[string]string, cfg CacheCfg) *RedisCache {
ring := goRedis.NewClusterClient(&goRedis.ClusterOptions{
Addrs: redisAddress,
})
Expand All @@ -63,7 +68,7 @@ func NewRedisCluster(redisAddress []string, clientConfig map[string]string) *Red
}
}

return &RedisCache{cache}
return &RedisCache{cache, cfg}
}

func (c *RedisCache) getKey(obj *object.FileObject) string {
Expand All @@ -72,6 +77,9 @@ func (c *RedisCache) getKey(obj *object.FileObject) string {

// Set put response into cache
func (c *RedisCache) Set(obj *object.FileObject, res *response.Response) error {
if res.ContentLength > c.cfg.MaxItemSize {
return nil
}
monitoring.Report().Inc("cache_ratio;status:set")
v, err := msgpack.Marshal(res)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/lock/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func (m *MemoryLock) NotifyAndRelease(_ context.Context, key string, originalRes
res, err := originalResponse.Copy()
return res, err == nil
} else {
res, err := originalResponse.CopyWithStream()
return res, err == nil
return nil, false
}
})
}
Expand Down
55 changes: 3 additions & 52 deletions pkg/response/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aldor007/mort/pkg/monitoring"
"github.com/aldor007/mort/pkg/object"
"github.com/aldor007/mort/pkg/transforms"
"github.com/djherbis/stream"
"github.com/pquerna/cachecontrol/cacheobject"
"github.com/vmihailenco/msgpack"
"go.uber.org/zap"
Expand Down Expand Up @@ -42,8 +41,6 @@ type Response struct {
bodyReader io.ReadCloser // original response buffer
bodySeeker io.ReadSeeker

resStream *stream.Stream // response stream dispatcher
hasParent bool // flag indicated that response is a copy
transformer bodyTransformFnc // function that can transform body writner
cachable bool // flag indicating if response can be cached
ttl int // time to live in cache
Expand Down Expand Up @@ -95,6 +92,7 @@ func NewError(statusCode int, err error) *Response {
res := Response{StatusCode: statusCode, errorValue: err}
res.Headers = make(http.Header)
res.Headers.Set(HeaderContentType, "application/json")
res.setBodyBytes([]byte(`{"message": "error"}`))
return &res
}

Expand Down Expand Up @@ -166,13 +164,6 @@ func (r *Response) Close() {
r.bodyReader.Close()
r.bodyReader = nil
}

if r.resStream != nil && r.hasParent == false {
go func() {
r.resStream.Close()
r.resStream.Remove()
}()
}
}

// SetDebug set flag indicating that response can including debug information
Expand Down Expand Up @@ -342,40 +333,6 @@ func (r *Response) Copy() (*Response, error) {

}

// CopyWithStream should be used with not buffered response that contain stream.
// It tries to duplicate response stream for multiple readers.
func (r *Response) CopyWithStream() (*Response, error) {
if r == nil {
return nil, errors.New("response is not created")
}
if r.body != nil {
return r.Copy()
}
// todo Add some protection mechanism to disallow CopyWithStream when Send or SendContent method is called.
// todo Also Stream method should be protected somehow, maybe if it possible should be removed in favor of SendContent.

c := Response{StatusCode: r.StatusCode, ContentLength: r.ContentLength, debug: r.debug, errorValue: r.errorValue}
c.Headers = r.Headers.Clone()
if r.resStream != nil {
c.resStream = r.resStream
c.hasParent = true
return &c, nil
}

r.bodyReader = r.reader
var err error
r.resStream, err = stream.New("res")
if err != nil {
return nil, err
}
c.resStream = r.resStream
c.hasParent = true
r.reader = ioutil.NopCloser(io.TeeReader(r.bodyReader, r.resStream))

return &c, nil

}

// BytesReaderCloser wraps Bytes

type (
Expand All @@ -398,11 +355,6 @@ func (c bytesReaderAtSeekerNopCloser) Close() error {

// Stream return io.Reader interface from correct response content
func (r *Response) Stream() io.ReadCloser {
if r.hasParent == true && r.resStream != nil {
r, _ := r.resStream.NextReader()
return ioutil.NopCloser(r)
}

if r.body != nil {
return bytesReaderAtSeekerNopCloser{readerAtSeeker: bytes.NewReader(r.body)}
}
Expand Down Expand Up @@ -441,9 +393,8 @@ func (r *Response) writeDebug() {
if err != nil {
panic(err)
}
r.reader = ioutil.NopCloser(bytes.NewReader(jsonBody))
r.body = jsonBody
r.ContentLength = int64(len(jsonBody))
r.Close()
r.setBodyBytes(jsonBody)
r.SetContentType("application/json")
}
}
30 changes: 2 additions & 28 deletions pkg/response/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func TestNewError(t *testing.T) {
assert.Equal(t, res.Error(), err)

buf, err := res.Body()
assert.NotNil(t, err, "Should return error when reading body")
assert.Nil(t, buf)
assert.Nil(t, err)
assert.Equal(t, string(buf), `{"message": "error"}`)

p := &object.FileObject{}
res.SetDebug(&object.FileObject{Debug: true, Parent: p, Transforms: transforms.Transforms{NotEmpty: true}})
Expand Down Expand Up @@ -282,29 +282,3 @@ func BenchmarkNewCopy(b *testing.B) {
}
}
}

func BenchmarkNewCopyWithStream(b *testing.B) {
buf := make([]byte, 1024*1024*4)
wBuf := make([]byte, 0, 1024*1024*1)
for i := 0; i < b.N; i++ {
s := ioutil.NopCloser(bytes.NewReader(buf))
w := bytes.NewBuffer(wBuf)
res := New(200, s)
res.Headers.Set("X-Header", "1")
res.SetContentType("text/html")
resCpy, _ := res.CopyWithStream()
go func() {
io.Copy(w, res.Stream())
res.Close()
}()

body, err := ioutil.ReadAll(resCpy.Stream())
if err != nil {
b.Fatalf("Errors %s", err)
}

if len(body) != len(buf) {
b.Fatalf("Inavlid body len %d != %d %d", len(body), len(buf), i)
}
}
}
10 changes: 8 additions & 2 deletions tests-int/Cache.Spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ describe('Image processing - cache', function () {
return done(err2);
}

expect(res.headers['x-mort-cache']).to.eql('hit');
// If not stream mort should cache response
if (res.headers['content-length']) {
expect(res.headers['x-mort-cache']).to.eql('hit');
}
done(err)
})
})
Expand Down Expand Up @@ -100,7 +103,10 @@ describe('Image processing - cache', function () {
return done(err2);
}

expect(res.headers['x-mort-cache']).to.eql('hit');
// If not stream mort should cache response
if (res.headers['content-length']) {
expect(res.headers['x-mort-cache']).to.eql('hit');
}
done(err)
})
})
Expand Down

0 comments on commit e4bba26

Please # to comment.