Skip to content

Commit f3d06d9

Browse files
committed
Add a heartbeat executor for SSE
Signed-off-by: Réda Housni Alaoui <reda-alaoui@hey.com>
1 parent ecdb633 commit f3d06d9

File tree

5 files changed

+566
-1
lines changed

5 files changed

+566
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2002-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.servlet.mvc.method.annotation;
18+
19+
20+
import java.io.IOException;
21+
import java.time.Duration;
22+
import java.util.Set;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.ScheduledFuture;
25+
26+
import org.jspecify.annotations.Nullable;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
import org.springframework.context.SmartLifecycle;
30+
import org.springframework.http.MediaType;
31+
import org.springframework.scheduling.TaskScheduler;
32+
33+
/**
34+
* @author Réda Housni Alaoui
35+
*/
36+
public class DefaultSseEmitterHeartbeatExecutor implements SmartLifecycle, SseEmitterHeartbeatExecutor {
37+
38+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSseEmitterHeartbeatExecutor.class);
39+
40+
private final TaskScheduler taskScheduler;
41+
private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet();
42+
43+
private final Object lifecycleMonitor = new Object();
44+
45+
private Duration period = Duration.ofSeconds(5);
46+
private String eventName = "ping";
47+
private String eventObject = "ping";
48+
49+
private volatile boolean running;
50+
@Nullable
51+
private volatile ScheduledFuture<?> taskFuture;
52+
53+
public DefaultSseEmitterHeartbeatExecutor(TaskScheduler taskScheduler) {
54+
this.taskScheduler = taskScheduler;
55+
}
56+
57+
public void setPeriod(Duration period) {
58+
this.period = period;
59+
}
60+
61+
public void setEventName(String eventName) {
62+
this.eventName = eventName;
63+
}
64+
65+
public void setEventObject(String eventObject) {
66+
this.eventObject = eventObject;
67+
}
68+
69+
@Override
70+
public void start() {
71+
synchronized (lifecycleMonitor) {
72+
taskFuture = taskScheduler.scheduleAtFixedRate(this::ping, period);
73+
running = true;
74+
}
75+
}
76+
77+
@Override
78+
public void register(SseEmitter emitter) {
79+
Runnable closeCallback = () -> emitters.remove(emitter);
80+
emitter.onCompletion(closeCallback);
81+
emitter.onError(t -> closeCallback.run());
82+
emitter.onTimeout(closeCallback);
83+
84+
emitters.add(emitter);
85+
}
86+
87+
@Override
88+
public void stop() {
89+
synchronized (lifecycleMonitor) {
90+
ScheduledFuture<?> future = taskFuture;
91+
if (future != null) {
92+
future.cancel(true);
93+
}
94+
emitters.clear();
95+
running = false;
96+
}
97+
}
98+
99+
@Override
100+
public boolean isRunning() {
101+
return running;
102+
}
103+
104+
boolean isRegistered(SseEmitter emitter) {
105+
return emitters.contains(emitter);
106+
}
107+
108+
private void ping() {
109+
LOGGER.atDebug().log(() -> "Pinging %s emitter(s)".formatted(emitters.size()));
110+
111+
for (SseEmitter emitter : emitters) {
112+
if (Thread.currentThread().isInterrupted()) {
113+
return;
114+
}
115+
LOGGER.trace("Pinging {}", emitter);
116+
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name(eventName).data(eventObject, MediaType.TEXT_PLAIN);
117+
try {
118+
emitter.send(eventBuilder);
119+
} catch (IOException | RuntimeException e) {
120+
// According to SseEmitter's Javadoc, the container itself will call SseEmitter#completeWithError
121+
LOGGER.debug(e.getMessage());
122+
}
123+
}
124+
}
125+
}

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/RequestMappingHandlerAdapter.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
* @author Rossen Stoyanchev
124124
* @author Juergen Hoeller
125125
* @author Sebastien Deleuze
126+
* @author Réda Housni Alaoui
126127
* @since 3.1
127128
* @see HandlerMethodArgumentResolver
128129
* @see HandlerMethodReturnValueHandler
@@ -201,6 +202,8 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter
201202

202203
private final Map<ControllerAdviceBean, Set<Method>> modelAttributeAdviceCache = new LinkedHashMap<>();
203204

205+
@Nullable
206+
private SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor;
204207

205208
/**
206209
* Provide resolvers for custom argument types. Custom resolvers are ordered
@@ -526,6 +529,13 @@ public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDisc
526529
this.parameterNameDiscoverer = parameterNameDiscoverer;
527530
}
528531

532+
/**
533+
* Set the {@link SseEmitterHeartbeatExecutor} that will be used to periodically prob the SSE connection health
534+
*/
535+
public void setSseEmitterHeartbeatExecutor(@Nullable SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor) {
536+
this.sseEmitterHeartbeatExecutor = sseEmitterHeartbeatExecutor;
537+
}
538+
529539
/**
530540
* A {@link ConfigurableBeanFactory} is expected for resolving expressions
531541
* in method argument default values.
@@ -735,7 +745,7 @@ private List<HandlerMethodReturnValueHandler> getDefaultReturnValueHandlers() {
735745
handlers.add(new ViewMethodReturnValueHandler());
736746
handlers.add(new ResponseBodyEmitterReturnValueHandler(getMessageConverters(),
737747
this.reactiveAdapterRegistry, this.taskExecutor, this.contentNegotiationManager,
738-
initViewResolvers(), initLocaleResolver()));
748+
initViewResolvers(), initLocaleResolver(), this.sseEmitterHeartbeatExecutor));
739749
handlers.add(new StreamingResponseBodyReturnValueHandler());
740750
handlers.add(new HttpEntityMethodProcessor(getMessageConverters(),
741751
this.contentNegotiationManager, this.requestResponseBodyAdvice, this.errorResponseInterceptors));

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java

+28
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Collections;
2626
import java.util.List;
2727
import java.util.Locale;
28+
import java.util.Optional;
2829
import java.util.Set;
2930
import java.util.function.Consumer;
3031

@@ -89,6 +90,7 @@
8990
* </ul>
9091
*
9192
* @author Rossen Stoyanchev
93+
* @author Réda Housni Alaoui
9294
* @since 4.2
9395
*/
9496
public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
@@ -101,6 +103,8 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
101103

102104
private final LocaleResolver localeResolver;
103105

106+
@Nullable
107+
private final SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor;
104108

105109
/**
106110
* Simple constructor with reactive type support based on a default instance of
@@ -143,11 +147,32 @@ public ResponseBodyEmitterReturnValueHandler(
143147
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager,
144148
List<ViewResolver> viewResolvers, @Nullable LocaleResolver localeResolver) {
145149

150+
this(messageConverters, registry, executor, manager, viewResolvers, localeResolver, null);
151+
}
152+
153+
/**
154+
* Constructor that with added arguments for view rendering.
155+
* @param messageConverters converters to write emitted objects with
156+
* @param registry for reactive return value type support
157+
* @param executor for blocking I/O writes of items emitted from reactive types
158+
* @param manager for detecting streaming media types
159+
* @param viewResolvers resolvers for fragment stream rendering
160+
* @param localeResolver the {@link LocaleResolver} for fragment stream rendering
161+
* @param sseEmitterHeartbeatExecutor for sending periodic events to SSE clients
162+
* @since 6.2
163+
*/
164+
public ResponseBodyEmitterReturnValueHandler(
165+
List<HttpMessageConverter<?>> messageConverters,
166+
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager,
167+
List<ViewResolver> viewResolvers, @Nullable LocaleResolver localeResolver,
168+
@Nullable SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor) {
169+
146170
Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
147171
this.sseMessageConverters = initSseConverters(messageConverters);
148172
this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager, null);
149173
this.viewResolvers = viewResolvers;
150174
this.localeResolver = (localeResolver != null ? localeResolver : new AcceptHeaderLocaleResolver());
175+
this.sseEmitterHeartbeatExecutor = sseEmitterHeartbeatExecutor;
151176
}
152177

153178
private static List<HttpMessageConverter<?>> initSseConverters(List<HttpMessageConverter<?>> converters) {
@@ -239,6 +264,9 @@ public void handleReturnValue(@Nullable Object returnValue, MethodParameter retu
239264
}
240265

241266
emitter.initialize(emitterHandler);
267+
if (emitter instanceof SseEmitter sseEmitter) {
268+
Optional.ofNullable(sseEmitterHeartbeatExecutor).ifPresent(handler -> handler.register(sseEmitter));
269+
}
242270
}
243271

244272

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2002-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.servlet.mvc.method.annotation;
18+
19+
/**
20+
* @author Réda Housni Alaoui
21+
*/
22+
public interface SseEmitterHeartbeatExecutor {
23+
24+
void register(SseEmitter emitter);
25+
}

0 commit comments

Comments
 (0)