Skip to content

Commit

Permalink
go channel in processor
Browse files Browse the repository at this point in the history
  • Loading branch information
aldor007 committed Nov 3, 2017
1 parent 4b3349a commit df17c2c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 14 deletions.
2 changes: 1 addition & 1 deletion object/file_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewFileObject(uri string, mortConfig *config.Config) (*FileObject, error) {
obj.Uri = uri

err := obj.decode(mortConfig)
log.Log().Infow("New FileObject", "path", uri, "key", obj.Key, "bucket", obj.Bucket, "storage", obj.Storage,
log.Log().Infow("New FileObject", "path", uri, "key", obj.Key, "bucket", obj.Bucket, "storage", obj.Storage.Kind,
"hasTransforms", obj.HasTransform(), "hasParent" , obj.HasParent())
return &obj, err
}
Expand Down
54 changes: 42 additions & 12 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,58 @@ func hanldeGET(ctx echo.Context, obj *object.FileObject) *response.Response {
}
}


resChan := make(chan *response.Response)
parentChan := make(chan *response.Response)

// check if object is on storage
res = updateHeaders(storage.Get(obj))
if res.StatusCode == 200 {
return res
}
go func(o *object.FileObject) {
resChan <- storage.Get(o)
}(obj)

// get parent from storage
if parentObj != nil {
parentRes = updateHeaders(storage.Get(parentObj))

if parentRes.StatusCode != 200 {
return parentRes
}
go func(p *object.FileObject) {
parentChan <- storage.Head(p)
}(parentObj)
}

resLoop:
for {
select {
case res = <-resChan:
if parentObj != nil && (parentRes == nil || parentRes.StatusCode == 0) {
go func () {
resChan <- res
}()

} else {
if res.StatusCode == 200 && parentRes.StatusCode == 200 {
return updateHeaders(res)
}

if res.StatusCode == 404 {
break resLoop
} else {
return updateHeaders(res)
}
}
case parentRes = <-parentChan:
if parentRes.StatusCode == 404 {
return updateHeaders(parentRes)
}
default:

defer parentRes.Close()
}
}

if obj.HasTransform() && strings.Contains(parentRes.ContentType, "image/") {
defer res.Close()
parentRes = updateHeaders(storage.Get(parentObj))

if parentRes.StatusCode != 200 {
return updateHeaders(parentRes)
}

defer parentRes.Close()

// revers order of transforms
for i := 0; i < len(transforms)/2; i++ {
Expand Down
2 changes: 1 addition & 1 deletion response/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func (r *Response) Close() {
func (r *Response) SetDebug(debug string) {
if debug == "1" {
r.debug = true
r.Set("Cache-Control", "no-cache")
return
}

r.debug = false
r.Set("Cache-Control", "no-cache")
}

func (r *Response) HasError() bool {
Expand Down
29 changes: 29 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ func Get(obj *object.FileObject) *response.Response {
return prepareResponse(obj, reader, metadata)
}

func Head(obj *object.FileObject) *response.Response {
key := getKey(obj)
client, err := getClient(obj)
if err != nil {
log.Log().Infow("Storage/Get get client", "obj.Key", obj.Key, "error", err)
return response.NewError(503, err)
}

item, err := client.Item(key)
if err != nil {
if err == stow.ErrNotFound {
log.Log().Infow("Storage/Get item response", "obj.Key", obj.Key, "sc", 404)
return response.NewBuf(404, []byte(notFound))
}

log.Log().Infow("Storage/Get item response", "obj.Key", obj.Key, "error", err)
return response.NewError(500, err)
}

metadata, err := item.Metadata()
if err != nil {
log.Log().Warnw("Storage/Get read metadata", "obj.Key", obj.Key, "sc", 500, "error", err)
return response.NewError(500, err)
}

return prepareResponse(obj, nil, metadata)
}

func Set(obj *object.FileObject, _ http.Header, contentLen int64, body io.ReadCloser) *response.Response {
client, err := getClient(obj)
if err != nil {
Expand Down Expand Up @@ -224,6 +252,7 @@ func getClient(obj *object.FileObject) (stow.Container, error) {
func getKey(obj *object.FileObject) string {
return path.Join(obj.Storage.PathPrefix, obj.Key)
}

func prepareResponse(obj *object.FileObject, stream io.ReadCloser, metadata map[string]interface{}) *response.Response {
res := response.New(200, stream)

Expand Down

0 comments on commit df17c2c

Please # to comment.