Skip to content

Commit

Permalink
#2055: provide entity-revision header for all command responses and a…
Browse files Browse the repository at this point in the history
…ll events
  • Loading branch information
thjaeckle committed Feb 14, 2025
1 parent 2de8ee8 commit 05d5d34
Show file tree
Hide file tree
Showing 110 changed files with 627 additions and 297 deletions.
4 changes: 3 additions & 1 deletion .run/ConnectivityService.run.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ConnectivityService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="CLEANUP_HISTORY_RETENTION_DURATION" value="6d" />
<env name="CONNECTION_EVENT_HISTORICAL_HEADERS_TO_PERSIST.0" value="ditto-originator" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.connectivity.service.ConnectivityService" />
<module name="ditto-connectivity-service" />
Expand Down
2 changes: 2 additions & 0 deletions .run/GatewayService.run.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="GatewayService" type="Application" factoryName="Application" folderName="Ditto">
<envs>
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.gateway.service.starter.GatewayService" />
<module name="ditto-gateway-service" />
Expand Down
4 changes: 3 additions & 1 deletion .run/PoliciesService.run.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="PoliciesService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="CLEANUP_HISTORY_RETENTION_DURATION" value="5d" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="POLICY_EVENT_HISTORICAL_HEADERS_TO_PERSIST.0" value="ditto-originator" />
<env name="POLICY_EVENT_HISTORICAL_HEADERS_TO_PERSIST.1" value="correlation-id" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.policies.service.starter.PoliciesService" />
<module name="ditto-policies-service" />
Expand Down
6 changes: 4 additions & 2 deletions .run/SearchService.run.xml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="SearchService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="ACTIVITY_CHECK_INTERVAL" value="1h" />
<env name="BACKGROUND_SYNC_QUIET_PERIOD" value="1h" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_MAX_BULK_SIZE" value="8" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_PARALLELISM" value="4" />
<env name="THINGS_SEARCH_UPDATER_STREAM_WRITE_INTERVAL" value="1s" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_MAX_BULK_SIZE" value="8" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.thingsearch.service.starter.SearchService" />
<module name="ditto-thingsearch-service" />
Expand Down
4 changes: 3 additions & 1 deletion .run/ThingsService.run.xml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ThingsService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="CLEANUP_HISTORY_RETENTION_DURATION" value="24h" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.things.service.starter.ThingsService" />
<module name="ditto-things-service" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,17 @@ public enum DittoHeaderDefinition implements HeaderDefinition {
true,
HeaderValueValidators.getJsonObjectValidator()),

/**
* Header containing the current revision of the entity (e.g. thing/policy) which was e.g. modified or fetched.
*
* @since 3.7.0
*/
ENTITY_REVISION("entity-revision",
Long.class,
false,
true,
HeaderValueValidators.getLongValidator()),

/**
* Internal header containing the pre-defined configured {@code extraFields} as list of jsonPointers for the
* emitted thing event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.json.JsonFactory;
Expand Down Expand Up @@ -65,7 +66,12 @@ protected AbstractEventsourcedEvent(final String type,
final long revision,
final JsonFieldDefinition<String> entityIdFieldDefinition) {

super(type, timestamp, dittoHeaders, metadata);
super(type, timestamp,
dittoHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.ENTITY_REVISION.getKey(), String.valueOf(revision))
.build(),
metadata
);
this.entityId = checkNotNull(entityId, "entityId");
this.revision = revision;
this.entityIdFieldDefinition = entityIdFieldDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public final class ImmutableDittoHeadersTest {
.set(DittoHeaderDefinition.ORIGINATOR.getKey(), "foo:bar")
.build();

private static final Long KNOWN_ENTITY_REVISION = 42L;

private static final JsonArray KNOWN_PRE_DEFINED_EXTRA_FIELDS = JsonArray.newBuilder()
.add("foo:bar:123")
.build();
Expand Down Expand Up @@ -215,6 +217,7 @@ public void settingAllKnownHeadersWorksAsExpected() {
.putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION))
.putHeader(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP))
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString())
.putHeader(DittoHeaderDefinition.ENTITY_REVISION.getKey(), String.valueOf(KNOWN_ENTITY_REVISION))
.putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(),
KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString())
.putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(),
Expand Down Expand Up @@ -551,6 +554,7 @@ public void toJsonReturnsExpected() {
.set(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), KNOWN_AT_HISTORICAL_REVISION)
.set(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), KNOWN_AT_HISTORICAL_TIMESTAMP.toString())
.set(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS)
.set(DittoHeaderDefinition.ENTITY_REVISION.getKey(), KNOWN_ENTITY_REVISION)
.set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), KNOWN_PRE_DEFINED_EXTRA_FIELDS)
.set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(),
KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT)
Expand Down Expand Up @@ -795,6 +799,7 @@ private static Map<String, String> createMapContainingAllKnownHeaders() {
result.put(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION));
result.put(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP));
result.put(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString());
result.put(DittoHeaderDefinition.ENTITY_REVISION.getKey(), String.valueOf(KNOWN_ENTITY_REVISION));
result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(),
KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString());
result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.ConnectionAction;
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.ConnectionState;
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.StagedCommand;
import org.eclipse.ditto.internal.utils.persistentactors.results.Result;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;

/**
* Abstract base class for ephemeral strategies not affecting the persistence.
Expand All @@ -53,7 +53,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final C command,
@Nullable final Metadata metadata) {

final WithDittoHeaders response = getResponse(context.getState(), command.getDittoHeaders());
final WithDittoHeaders response = getResponse(context.getState(),
createCommandResponseDittoHeaders(command.getDittoHeaders(), nextRevision));
final List<ConnectionAction> actions = getActions();
return newMutationResult(StagedCommand.of(command, null, response, actions), null, response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final ConnectivityEvent<?> event = ConnectionClosed.of(context.getState().id(), nextRevision,
getEventTimestamp(), command.getDittoHeaders(), metadata);
final WithDittoHeaders response =
CloseConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
CloseConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(command.getDittoHeaders(), nextRevision));
final List<ConnectionAction> actions =
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS, ConnectionAction.CLOSE_CONNECTION, ConnectionAction.STOP_CLIENT_ACTORS,
ConnectionAction.SEND_RESPONSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -81,11 +82,11 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
.created(timestamp)
.modified(timestamp)
.build();
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ConnectivityEvent<?> event =
ConnectionCreated.of(connection, nextRevision, getEventTimestamp(), command.getDittoHeaders(),
metadata);
ConnectionCreated.of(connection, nextRevision, getEventTimestamp(), dittoHeaders, metadata);
final WithDittoHeaders response =
CreateConnectionResponse.of(connection, command.getDittoHeaders());
CreateConnectionResponse.of(connection, createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
final Optional<DittoRuntimeException> validationError = validate(context, command);
if (validationError.isPresent()) {
return newErrorResult(validationError.get(), command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -50,10 +51,12 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final DeleteConnection command,
@Nullable final Metadata metadata) {

final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ConnectivityEvent<?> event = ConnectionDeleted.of(context.getState().id(), nextRevision,
getEventTimestamp(), command.getDittoHeaders(), metadata);
getEventTimestamp(), dittoHeaders, metadata);
final WithDittoHeaders response =
DeleteConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
DeleteConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
// Not closing the connection asynchronously; rely on client actors to cleanup all resources when stopped.
final List<ConnectionAction> actions =
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -61,22 +62,23 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
.revision(nextRevision)
.modified(eventTs)
.build();
if (entity != null && entity.getConnectionType() != connection.getConnectionType() &&
!command.getDittoHeaders().isSudo()) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
if (entity != null && entity.getConnectionType() != connection.getConnectionType() && !dittoHeaders.isSudo()) {
return ResultFactory.newErrorResult(
ConnectionConfigurationInvalidException
.newBuilder("ConnectionType <" + connection.getConnectionType().getName() +
"> of existing connection <" + context.getState().id() + "> cannot be changed!")
.dittoHeaders(command.getDittoHeaders())
.dittoHeaders(dittoHeaders)
.build(),
command
);
}
final ConnectivityEvent<?> event =
ConnectionModified.of(connection, nextRevision, getEventTimestamp(), command.getDittoHeaders(),
ConnectionModified.of(connection, nextRevision, getEventTimestamp(), dittoHeaders,
metadata);
final WithDittoHeaders response =
ModifyConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
ModifyConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
final boolean isCurrentConnectionOpen = Optional.ofNullable(entity)
.map(c -> c.getConnectionStatus() == ConnectivityStatus.OPEN)
.orElse(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -60,10 +61,12 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
if (validationError.isPresent()) {
return newErrorResult(validationError.get(), command);
} else {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ConnectivityEvent<?> event = ConnectionOpened.of(context.getState().id(), nextRevision,
getEventTimestamp(), command.getDittoHeaders(), metadata);
getEventTimestamp(), dittoHeaders, metadata);
final WithDittoHeaders response =
OpenConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
OpenConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
final List<ConnectionAction> actions =
Arrays.asList(ENABLE_LOGGING, PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS,
SEND_RESPONSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -48,7 +49,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co

if (entity != null) {
return ResultFactory.newQueryResult(command,
appendETagHeaderIfProvided(command, getRetrieveConnectionResponse(entity, command), entity)
appendETagHeaderIfProvided(command, getRetrieveConnectionResponse(entity, command, nextRevision),
entity)
);
} else {
return ResultFactory.newErrorResult(notAccessible(context, command), command);
Expand All @@ -66,14 +68,15 @@ public Optional<EntityTag> nextEntityTag(final RetrieveConnection command, @Null
return Optional.ofNullable(newEntity).flatMap(EntityTag::fromEntity);
}

private static DittoHeadersSettable<?> getRetrieveConnectionResponse(@Nullable final Connection connection,
final ConnectivityQueryCommand<RetrieveConnection> command) {
private DittoHeadersSettable<?> getRetrieveConnectionResponse(@Nullable final Connection connection,
final ConnectivityQueryCommand<RetrieveConnection> command, final long nextRevision) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
if (connection != null) {
return RetrieveConnectionResponse.of(getConnectionJson(connection, command),
command.getDittoHeaders());
createCommandResponseDittoHeaders(dittoHeaders, nextRevision-1));
} else {
return ConnectionNotAccessibleException.newBuilder(((RetrieveConnection) command).getEntityId())
.dittoHeaders(command.getDittoHeaders())
.dittoHeaders(dittoHeaders)
.build();
}
}
Expand Down
Loading

0 comments on commit 05d5d34

Please # to comment.