15
15
*/
16
16
package org .springframework .data .redis .cache ;
17
17
18
+ import reactor .core .publisher .Mono ;
19
+
20
+ import java .nio .ByteBuffer ;
18
21
import java .nio .charset .StandardCharsets ;
19
22
import java .time .Duration ;
20
23
import java .util .concurrent .TimeUnit ;
24
+ import java .util .function .BiFunction ;
21
25
import java .util .function .Consumer ;
22
26
import java .util .function .Function ;
27
+ import java .util .function .Supplier ;
23
28
24
29
import org .springframework .dao .PessimisticLockingFailureException ;
30
+ import org .springframework .data .redis .connection .ReactiveRedisConnection ;
31
+ import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
25
32
import org .springframework .data .redis .connection .RedisConnection ;
26
33
import org .springframework .data .redis .connection .RedisConnectionFactory ;
27
34
import org .springframework .data .redis .connection .RedisStringCommands .SetOption ;
@@ -114,8 +121,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
114
121
Assert .notNull (key , "Key must not be null" );
115
122
116
123
byte [] result = shouldExpireWithin (ttl )
117
- ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
118
- : execute (name , connection -> connection .stringCommands ().get (key ));
124
+ ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
125
+ : execute (name , connection -> connection .stringCommands ().get (key ));
119
126
120
127
statistics .incGets (name );
121
128
@@ -128,6 +135,66 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
128
135
return result ;
129
136
}
130
137
138
+ @ Override
139
+ public Mono <ByteBuffer > retrieve (String name , byte [] key , @ Nullable Duration ttl ) {
140
+
141
+ Assert .notNull (name , "Name must not be null" );
142
+ Assert .notNull (key , "Key must not be null" );
143
+
144
+ Mono <ByteBuffer > result = nonBlockingExecutionStrategy (name ).apply (key , ttl );
145
+
146
+ result = result .doOnSuccess (byteBuffer -> {
147
+ if (byteBuffer != null ) {
148
+ statistics .incHits (name );
149
+ }
150
+ else {
151
+ statistics .incMisses (name );
152
+ }
153
+ }).doFirst (() -> statistics .incGets (name ));
154
+
155
+ return result ;
156
+ }
157
+
158
+ private BiFunction <byte [], Duration , Mono <ByteBuffer >> nonBlockingExecutionStrategy (String cacheName ) {
159
+
160
+ boolean isReactive = this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
161
+
162
+ // Execution strategy that will be applied when Lettuce is used
163
+ if (isReactive ) {
164
+
165
+ return (key , ttl ) -> {
166
+
167
+ ByteBuffer wrappedKey = ByteBuffer .wrap (key );
168
+
169
+ Mono <ByteBuffer > result = shouldExpireWithin (ttl )
170
+ ? executeReactively (connection -> connection .stringCommands ().getEx (wrappedKey , Expiration .from (ttl )))
171
+ : executeReactively (connection -> connection .stringCommands ().get (wrappedKey ));
172
+
173
+ result .doFirst (() -> executeLockFree (connection ->
174
+ checkAndPotentiallyWaitUntilUnlocked (cacheName , connection )));
175
+
176
+ return result ;
177
+ };
178
+ }
179
+
180
+ // Execution strategy that will be applied when Jedis is used
181
+ return (key , ttl ) -> {
182
+
183
+ Supplier <ByteBuffer > getKey = () -> execute (cacheName , connection ->
184
+ nullSafeByteBufferWrap (connection .stringCommands ().get (key )));
185
+
186
+ Supplier <ByteBuffer > getKeyWithExpiration = () -> execute (cacheName , connection ->
187
+ nullSafeByteBufferWrap (connection .stringCommands ().getEx (key , Expiration .from (ttl ))));
188
+
189
+ return shouldExpireWithin (ttl ) ? Mono .fromSupplier (getKeyWithExpiration ) : Mono .fromSupplier (getKey );
190
+ };
191
+ }
192
+
193
+ @ Nullable
194
+ private ByteBuffer nullSafeByteBufferWrap (@ Nullable byte [] value ) {
195
+ return value != null ? ByteBuffer .wrap (value ) : null ;
196
+ }
197
+
131
198
@ Override
132
199
public void put (String name , byte [] key , byte [] value , @ Nullable Duration ttl ) {
133
200
@@ -308,6 +375,18 @@ private void executeLockFree(Consumer<RedisConnection> callback) {
308
375
}
309
376
}
310
377
378
+ private <T > T executeReactively (Function <ReactiveRedisConnection , T > callback ) {
379
+
380
+ ReactiveRedisConnection connection = getReactiveRedisConnectionFactory ().getReactiveConnection ();
381
+
382
+ try {
383
+ return callback .apply (connection );
384
+ }
385
+ finally {
386
+ connection .closeLater ();
387
+ }
388
+ }
389
+
311
390
private void checkAndPotentiallyWaitUntilUnlocked (String name , RedisConnection connection ) {
312
391
313
392
if (!isLockingCacheWriter ()) {
@@ -333,11 +412,15 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
333
412
}
334
413
}
335
414
415
+ private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory () {
416
+ return (ReactiveRedisConnectionFactory ) this .connectionFactory ;
417
+ }
418
+
336
419
private static byte [] createCacheLockKey (String name ) {
337
420
return (name + "~lock" ).getBytes (StandardCharsets .UTF_8 );
338
421
}
339
422
340
- private boolean isTrue (@ Nullable Boolean value ) {
423
+ private static boolean isTrue (@ Nullable Boolean value ) {
341
424
return Boolean .TRUE .equals (value );
342
425
}
343
426
0 commit comments