diff --git a/cmd/demo/main.go b/cmd/demo/main.go index 2c59cf2..bccbe75 100644 --- a/cmd/demo/main.go +++ b/cmd/demo/main.go @@ -313,7 +313,7 @@ func main() { sos := op.NewSoS(name, &hedge.SoSOptions{ MemLimit: 150_000, DiskLimit: 120_000, - Expiration: 30, + Expiration: 5, }) writer, err := sos.Writer() @@ -448,9 +448,11 @@ func main() { Usage float64 `json:"usage"` } + // See $HOME/tmp/ locs, _ := os.ReadFile("readlocs") ss := strings.Split(string(locs), " ") + // See $HOME/tmp/ ra, err := mmap.Open("readdata") if err != nil { slog.Error(err.Error()) @@ -469,8 +471,9 @@ func main() { sos := func() *hedge.SoS { sos := op.NewSoS(name, &hedge.SoSOptions{ - MemLimit: 100_000, - Expiration: 30, + MemLimit: 10_000_000, + DiskLimit: 10_000_000, + Expiration: 5, }) writer, err := sos.Writer() @@ -511,6 +514,7 @@ func main() { writer.Close() slog.Info("total_write:", + "count", i, "val", wt, "err", writer.Err(), ) @@ -541,7 +545,7 @@ func main() { rt += len(d) } - slog.Info("total_read:", "val", rt) + slog.Info("total_read:", "count", i, "val", rt) return nil }) diff --git a/sos.go b/sos.go index 1ddecbb..a9f74ad 100644 --- a/sos.go +++ b/sos.go @@ -85,6 +85,7 @@ type SoS struct { dlock *sync.Mutex // local file lock wmtx *sync.Mutex // one active writer only writer *Writer // writer object + refs atomic.Int64 // self reference count wrefs atomic.Int64 // writer reference count rrefs atomic.Int64 // reader reference count on atomic.Int32 @@ -288,6 +289,8 @@ func (w *Writer) start() { if w.sos.data[n].bb != nil { w.sos.data[n].bufs = w.sos.data[n].bb.NewBinaryArray() w.sos.data[n].bb.Release() + w.sos.data[n].bb = nil + // slog.Info("arrow: release(bb):", "node", n) } } @@ -519,6 +522,7 @@ func (sos *SoS) Close() { } } + sos.refs.Add(-1) sos.on.Store(0) } @@ -558,10 +562,11 @@ func (sos *SoS) cleaner() { eg.Go(func() error { started := sos.start for { - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 1) + refs := sos.refs.Load() wrefs := sos.wrefs.Load() rrefs := sos.rrefs.Load() - if (wrefs + rrefs) > 0 { + if (refs + wrefs + rrefs) > 0 { started = time.Now() continue } @@ -574,7 +579,8 @@ func (sos *SoS) cleaner() { for _, node := range sos.op.soss[sos.Name].nodes { if sos.data[node].bufs != nil { sos.data[node].bufs.Release() - // slog.Info("arrow: release:", "node", node) + sos.data[node].bufs = nil + // slog.Info("arrow: release(buf):", "node", node) } } }() @@ -636,6 +642,7 @@ func newSoS(name string, op *Op, opts ...*SoSOptions) *SoS { sos.age = time.Hour * 1 } + sos.refs.Add(1) sos.start = time.Now() go sos.cleaner() return sos