|
4 | 4 | "context"
|
5 | 5 | "sync"
|
6 | 6 | "time"
|
| 7 | + "unsafe" |
7 | 8 | )
|
8 | 9 |
|
9 | 10 | // NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
|
@@ -178,3 +179,222 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
|
178 | 179 | return a.val, a.err
|
179 | 180 | }
|
180 | 181 | }
|
| 182 | + |
| 183 | +type flatentry struct { |
| 184 | + ovfl *flatentry |
| 185 | + next unsafe.Pointer |
| 186 | + prev unsafe.Pointer |
| 187 | + cmd string |
| 188 | + key string |
| 189 | + val []byte |
| 190 | + ttl int64 |
| 191 | + siz int64 |
| 192 | + mu sync.Mutex |
| 193 | + mark int64 |
| 194 | +} |
| 195 | + |
| 196 | +func (f *flatentry) insert(e *flatentry) { |
| 197 | + f.siz += e.siz |
| 198 | + f.mu.Lock() |
| 199 | + defer f.mu.Unlock() |
| 200 | + e.ovfl = f.ovfl |
| 201 | + f.ovfl = e |
| 202 | +} |
| 203 | + |
| 204 | +func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) { |
| 205 | + if f == nil { |
| 206 | + return |
| 207 | + } |
| 208 | + if ts >= f.ttl { |
| 209 | + expired = true |
| 210 | + return |
| 211 | + } |
| 212 | + if cmd == f.cmd { |
| 213 | + _ = ret.CacheUnmarshalView(f.val) |
| 214 | + return |
| 215 | + } |
| 216 | + f.mu.Lock() |
| 217 | + ovfl := f.ovfl |
| 218 | + f.mu.Unlock() |
| 219 | + return ovfl.find(cmd, ts) |
| 220 | +} |
| 221 | + |
| 222 | +const lrBatchSize = 64 |
| 223 | + |
| 224 | +type lrBatch struct { |
| 225 | + m map[*flatentry]struct{} |
| 226 | +} |
| 227 | + |
| 228 | +func NewFlattenCache(limit int64) CacheStore { |
| 229 | + f := &flatten{ |
| 230 | + flights: make(map[string]*adapterEntry), |
| 231 | + cache: make(map[string]*flatentry), |
| 232 | + head: &flatentry{}, |
| 233 | + tail: &flatentry{}, |
| 234 | + size: 0, |
| 235 | + limit: limit, |
| 236 | + } |
| 237 | + f.head.next = unsafe.Pointer(f.tail) |
| 238 | + f.tail.prev = unsafe.Pointer(f.head) |
| 239 | + f.lrup = sync.Pool{New: func() any { |
| 240 | + return &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} |
| 241 | + }} |
| 242 | + return f |
| 243 | +} |
| 244 | + |
| 245 | +type flatten struct { |
| 246 | + flights map[string]*adapterEntry |
| 247 | + cache map[string]*flatentry |
| 248 | + head *flatentry |
| 249 | + tail *flatentry |
| 250 | + lrup sync.Pool |
| 251 | + mark int64 |
| 252 | + size int64 |
| 253 | + limit int64 |
| 254 | + mu sync.RWMutex |
| 255 | +} |
| 256 | + |
| 257 | +func (f *flatten) llAdd(e *flatentry) { |
| 258 | + e.mark = f.mark |
| 259 | + e.prev = f.tail.prev |
| 260 | + e.next = unsafe.Pointer(f.tail) |
| 261 | + f.tail.prev = unsafe.Pointer(e) |
| 262 | + (*flatentry)(e.prev).next = unsafe.Pointer(e) |
| 263 | +} |
| 264 | + |
| 265 | +func (f *flatten) llDel(e *flatentry) { |
| 266 | + (*flatentry)(e.prev).next = e.next |
| 267 | + (*flatentry)(e.next).prev = e.prev |
| 268 | + e.mark = 0 |
| 269 | +} |
| 270 | + |
| 271 | +func (f *flatten) llTail(e *flatentry) { |
| 272 | + f.llDel(e) |
| 273 | + f.llAdd(e) |
| 274 | +} |
| 275 | + |
| 276 | +func (f *flatten) remove(e *flatentry) { |
| 277 | + f.size -= e.siz |
| 278 | + f.llDel(e) |
| 279 | + delete(f.cache, e.key) |
| 280 | +} |
| 281 | + |
| 282 | +func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) { |
| 283 | + f.mu.RLock() |
| 284 | + e := f.cache[key] |
| 285 | + f.mu.RUnlock() |
| 286 | + ts := now.UnixMilli() |
| 287 | + if v, _ := e.find(cmd, ts); v.typ != 0 { |
| 288 | + batch := f.lrup.Get().(*lrBatch) |
| 289 | + batch.m[e] = struct{}{} |
| 290 | + if len(batch.m) == lrBatchSize { |
| 291 | + f.mu.Lock() |
| 292 | + for e := range batch.m { |
| 293 | + if e.mark == f.mark { |
| 294 | + f.llTail(e) |
| 295 | + } |
| 296 | + } |
| 297 | + f.mu.Unlock() |
| 298 | + clear(batch.m) |
| 299 | + } |
| 300 | + f.lrup.Put(batch) |
| 301 | + return v, nil |
| 302 | + } |
| 303 | + fk := key + cmd |
| 304 | + f.mu.RLock() |
| 305 | + af := f.flights[fk] |
| 306 | + f.mu.RUnlock() |
| 307 | + if af != nil { |
| 308 | + return RedisMessage{}, af |
| 309 | + } |
| 310 | + f.mu.Lock() |
| 311 | + defer f.mu.Unlock() |
| 312 | + e = f.cache[key] |
| 313 | + v, expired := e.find(cmd, ts) |
| 314 | + if v.typ != 0 { |
| 315 | + f.llTail(e) |
| 316 | + return v, nil |
| 317 | + } |
| 318 | + if expired { |
| 319 | + f.remove(e) |
| 320 | + } |
| 321 | + if af = f.flights[fk]; af != nil { |
| 322 | + return RedisMessage{}, af |
| 323 | + } |
| 324 | + f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()} |
| 325 | + return RedisMessage{}, nil |
| 326 | +} |
| 327 | + |
| 328 | +func (f *flatten) Update(key, cmd string, val RedisMessage) int64 { |
| 329 | + fk := key + cmd |
| 330 | + bs := val.CacheMarshal(nil) |
| 331 | + fe := &flatentry{cmd: cmd, val: bs, ttl: val.CachePXAT(), siz: int64(len(bs)+len(key)+len(cmd)) + int64(unsafe.Sizeof(flatentry{}))} |
| 332 | + f.mu.Lock() |
| 333 | + af := f.flights[fk] |
| 334 | + if af != nil { |
| 335 | + delete(f.flights, fk) |
| 336 | + if af.xat < fe.ttl { |
| 337 | + fe.ttl = af.xat |
| 338 | + } |
| 339 | + } |
| 340 | + f.size += fe.siz |
| 341 | + for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); { |
| 342 | + e := (*flatentry)(ep) |
| 343 | + f.remove(e) |
| 344 | + ep = e.next |
| 345 | + } |
| 346 | + if e := f.cache[key]; e == nil { |
| 347 | + fe.key = key |
| 348 | + f.cache[key] = fe |
| 349 | + f.llAdd(fe) |
| 350 | + } else { |
| 351 | + e.insert(fe) |
| 352 | + } |
| 353 | + f.mu.Unlock() |
| 354 | + if af != nil { |
| 355 | + af.set(val, nil) |
| 356 | + } |
| 357 | + return fe.ttl |
| 358 | +} |
| 359 | + |
| 360 | +func (f *flatten) Cancel(key, cmd string, err error) { |
| 361 | + fk := key + cmd |
| 362 | + f.mu.Lock() |
| 363 | + defer f.mu.Unlock() |
| 364 | + if af := f.flights[fk]; af != nil { |
| 365 | + delete(f.flights, fk) |
| 366 | + af.set(RedisMessage{}, err) |
| 367 | + } |
| 368 | +} |
| 369 | + |
| 370 | +func (f *flatten) Delete(keys []RedisMessage) { |
| 371 | + f.mu.Lock() |
| 372 | + defer f.mu.Unlock() |
| 373 | + if keys == nil { |
| 374 | + f.cache = make(map[string]*flatentry, len(f.cache)) |
| 375 | + f.head.next = unsafe.Pointer(f.tail) |
| 376 | + f.tail.prev = unsafe.Pointer(f.head) |
| 377 | + f.mark++ |
| 378 | + f.size = 0 |
| 379 | + } else { |
| 380 | + for _, k := range keys { |
| 381 | + if e := f.cache[k.string]; e != nil { |
| 382 | + f.remove(e) |
| 383 | + } |
| 384 | + } |
| 385 | + } |
| 386 | +} |
| 387 | + |
| 388 | +func (f *flatten) Close(err error) { |
| 389 | + f.mu.Lock() |
| 390 | + flights := f.flights |
| 391 | + f.flights = nil |
| 392 | + f.cache = nil |
| 393 | + f.tail = nil |
| 394 | + f.head = nil |
| 395 | + f.mark++ |
| 396 | + f.mu.Unlock() |
| 397 | + for _, entry := range flights { |
| 398 | + entry.set(RedisMessage{}, err) |
| 399 | + } |
| 400 | +} |
0 commit comments