Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

#2055: provide entity-revision header for all command responses and all events #2121

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .run/ConnectivityService.run.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ConnectivityService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="CLEANUP_HISTORY_RETENTION_DURATION" value="6d" />
<env name="CONNECTION_EVENT_HISTORICAL_HEADERS_TO_PERSIST.0" value="ditto-originator" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.connectivity.service.ConnectivityService" />
<module name="ditto-connectivity-service" />
Expand Down
2 changes: 2 additions & 0 deletions .run/GatewayService.run.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="GatewayService" type="Application" factoryName="Application" folderName="Ditto">
<envs>
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.gateway.service.starter.GatewayService" />
<module name="ditto-gateway-service" />
Expand Down
4 changes: 3 additions & 1 deletion .run/PoliciesService.run.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="PoliciesService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="CLEANUP_HISTORY_RETENTION_DURATION" value="5d" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="POLICY_EVENT_HISTORICAL_HEADERS_TO_PERSIST.0" value="ditto-originator" />
<env name="POLICY_EVENT_HISTORICAL_HEADERS_TO_PERSIST.1" value="correlation-id" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.policies.service.starter.PoliciesService" />
<module name="ditto-policies-service" />
Expand Down
6 changes: 4 additions & 2 deletions .run/SearchService.run.xml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="SearchService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="ACTIVITY_CHECK_INTERVAL" value="1h" />
<env name="BACKGROUND_SYNC_QUIET_PERIOD" value="1h" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_MAX_BULK_SIZE" value="8" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_PARALLELISM" value="4" />
<env name="THINGS_SEARCH_UPDATER_STREAM_WRITE_INTERVAL" value="1s" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_MAX_BULK_SIZE" value="8" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.thingsearch.service.starter.SearchService" />
<module name="ditto-thingsearch-service" />
Expand Down
4 changes: 3 additions & 1 deletion .run/ThingsService.run.xml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ThingsService" type="Application" factoryName="Application" folderName="Ditto" nameIsGenerated="true">
<envs>
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="CLEANUP_HISTORY_RETENTION_DURATION" value="24h" />
<env name="DITTO_TRACING_ENABLED" value="true" />
<env name="LOG_LEVEL_APPLICATION" value="INFO" />
<env name="DITTO_TRACING_SAMPLER" value="always" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.things.service.starter.ThingsService" />
<module name="ditto-things-service" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,17 @@ public enum DittoHeaderDefinition implements HeaderDefinition {
true,
HeaderValueValidators.getJsonObjectValidator()),

/**
* Header containing the current revision of the entity (e.g. thing/policy) which was e.g. modified or fetched.
*
* @since 3.7.0
*/
ENTITY_REVISION("entity-revision",
Long.class,
false,
true,
HeaderValueValidators.getLongValidator()),

/**
* Internal header containing the pre-defined configured {@code extraFields} as list of jsonPointers for the
* emitted thing event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.json.JsonFactory;
Expand Down Expand Up @@ -65,7 +66,12 @@ protected AbstractEventsourcedEvent(final String type,
final long revision,
final JsonFieldDefinition<String> entityIdFieldDefinition) {

super(type, timestamp, dittoHeaders, metadata);
super(type, timestamp,
dittoHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.ENTITY_REVISION.getKey(), String.valueOf(revision))
.build(),
metadata
);
this.entityId = checkNotNull(entityId, "entityId");
this.revision = revision;
this.entityIdFieldDefinition = entityIdFieldDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public final class ImmutableDittoHeadersTest {
.set(DittoHeaderDefinition.ORIGINATOR.getKey(), "foo:bar")
.build();

private static final Long KNOWN_ENTITY_REVISION = 42L;

private static final JsonArray KNOWN_PRE_DEFINED_EXTRA_FIELDS = JsonArray.newBuilder()
.add("foo:bar:123")
.build();
Expand Down Expand Up @@ -215,6 +217,7 @@ public void settingAllKnownHeadersWorksAsExpected() {
.putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION))
.putHeader(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP))
.putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString())
.putHeader(DittoHeaderDefinition.ENTITY_REVISION.getKey(), String.valueOf(KNOWN_ENTITY_REVISION))
.putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(),
KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString())
.putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(),
Expand Down Expand Up @@ -551,6 +554,7 @@ public void toJsonReturnsExpected() {
.set(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), KNOWN_AT_HISTORICAL_REVISION)
.set(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), KNOWN_AT_HISTORICAL_TIMESTAMP.toString())
.set(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS)
.set(DittoHeaderDefinition.ENTITY_REVISION.getKey(), KNOWN_ENTITY_REVISION)
.set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), KNOWN_PRE_DEFINED_EXTRA_FIELDS)
.set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(),
KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT)
Expand Down Expand Up @@ -795,6 +799,7 @@ private static Map<String, String> createMapContainingAllKnownHeaders() {
result.put(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION));
result.put(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP));
result.put(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString());
result.put(DittoHeaderDefinition.ENTITY_REVISION.getKey(), String.valueOf(KNOWN_ENTITY_REVISION));
result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(),
KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString());
result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.ConnectionAction;
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.ConnectionState;
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.StagedCommand;
import org.eclipse.ditto.internal.utils.persistentactors.results.Result;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;

/**
* Abstract base class for ephemeral strategies not affecting the persistence.
Expand All @@ -53,7 +53,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final C command,
@Nullable final Metadata metadata) {

final WithDittoHeaders response = getResponse(context.getState(), command.getDittoHeaders());
final WithDittoHeaders response = getResponse(context.getState(),
createCommandResponseDittoHeaders(command.getDittoHeaders(), nextRevision));
final List<ConnectionAction> actions = getActions();
return newMutationResult(StagedCommand.of(command, null, response, actions), null, response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final ConnectivityEvent<?> event = ConnectionClosed.of(context.getState().id(), nextRevision,
getEventTimestamp(), command.getDittoHeaders(), metadata);
final WithDittoHeaders response =
CloseConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
CloseConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(command.getDittoHeaders(), nextRevision));
final List<ConnectionAction> actions =
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS, ConnectionAction.CLOSE_CONNECTION, ConnectionAction.STOP_CLIENT_ACTORS,
ConnectionAction.SEND_RESPONSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -81,11 +82,11 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
.created(timestamp)
.modified(timestamp)
.build();
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ConnectivityEvent<?> event =
ConnectionCreated.of(connection, nextRevision, getEventTimestamp(), command.getDittoHeaders(),
metadata);
ConnectionCreated.of(connection, nextRevision, getEventTimestamp(), dittoHeaders, metadata);
final WithDittoHeaders response =
CreateConnectionResponse.of(connection, command.getDittoHeaders());
CreateConnectionResponse.of(connection, createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
final Optional<DittoRuntimeException> validationError = validate(context, command);
if (validationError.isPresent()) {
return newErrorResult(validationError.get(), command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -50,10 +51,12 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final DeleteConnection command,
@Nullable final Metadata metadata) {

final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ConnectivityEvent<?> event = ConnectionDeleted.of(context.getState().id(), nextRevision,
getEventTimestamp(), command.getDittoHeaders(), metadata);
getEventTimestamp(), dittoHeaders, metadata);
final WithDittoHeaders response =
DeleteConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
DeleteConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
// Not closing the connection asynchronously; rely on client actors to cleanup all resources when stopped.
final List<ConnectionAction> actions =
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -61,22 +62,23 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
.revision(nextRevision)
.modified(eventTs)
.build();
if (entity != null && entity.getConnectionType() != connection.getConnectionType() &&
!command.getDittoHeaders().isSudo()) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
if (entity != null && entity.getConnectionType() != connection.getConnectionType() && !dittoHeaders.isSudo()) {
return ResultFactory.newErrorResult(
ConnectionConfigurationInvalidException
.newBuilder("ConnectionType <" + connection.getConnectionType().getName() +
"> of existing connection <" + context.getState().id() + "> cannot be changed!")
.dittoHeaders(command.getDittoHeaders())
.dittoHeaders(dittoHeaders)
.build(),
command
);
}
final ConnectivityEvent<?> event =
ConnectionModified.of(connection, nextRevision, getEventTimestamp(), command.getDittoHeaders(),
ConnectionModified.of(connection, nextRevision, getEventTimestamp(), dittoHeaders,
metadata);
final WithDittoHeaders response =
ModifyConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
ModifyConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
final boolean isCurrentConnectionOpen = Optional.ofNullable(entity)
.map(c -> c.getConnectionStatus() == ConnectivityStatus.OPEN)
.orElse(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -60,10 +61,12 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
if (validationError.isPresent()) {
return newErrorResult(validationError.get(), command);
} else {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ConnectivityEvent<?> event = ConnectionOpened.of(context.getState().id(), nextRevision,
getEventTimestamp(), command.getDittoHeaders(), metadata);
getEventTimestamp(), dittoHeaders, metadata);
final WithDittoHeaders response =
OpenConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
OpenConnectionResponse.of(context.getState().id(),
createCommandResponseDittoHeaders(dittoHeaders, nextRevision));
final List<ConnectionAction> actions =
Arrays.asList(ENABLE_LOGGING, PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS,
SEND_RESPONSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -48,7 +49,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co

if (entity != null) {
return ResultFactory.newQueryResult(command,
appendETagHeaderIfProvided(command, getRetrieveConnectionResponse(entity, command), entity)
appendETagHeaderIfProvided(command, getRetrieveConnectionResponse(entity, command, nextRevision),
entity)
);
} else {
return ResultFactory.newErrorResult(notAccessible(context, command), command);
Expand All @@ -66,14 +68,15 @@ public Optional<EntityTag> nextEntityTag(final RetrieveConnection command, @Null
return Optional.ofNullable(newEntity).flatMap(EntityTag::fromEntity);
}

private static DittoHeadersSettable<?> getRetrieveConnectionResponse(@Nullable final Connection connection,
final ConnectivityQueryCommand<RetrieveConnection> command) {
private DittoHeadersSettable<?> getRetrieveConnectionResponse(@Nullable final Connection connection,
final ConnectivityQueryCommand<RetrieveConnection> command, final long nextRevision) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
if (connection != null) {
return RetrieveConnectionResponse.of(getConnectionJson(connection, command),
command.getDittoHeaders());
createCommandResponseDittoHeaders(dittoHeaders, nextRevision-1));
} else {
return ConnectionNotAccessibleException.newBuilder(((RetrieveConnection) command).getEntityId())
.dittoHeaders(command.getDittoHeaders())
.dittoHeaders(dittoHeaders)
.build();
}
}
Expand Down
Loading
Loading