diff --git a/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/LiveSyncConnector.java b/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/LiveSyncConnector.java new file mode 100644 index 00000000000..5b1fb8fc307 --- /dev/null +++ b/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/LiveSyncConnector.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.java.pushpull.stream; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.syncope.core.persistence.api.entity.ConnInstance; +import org.apache.syncope.core.provisioning.api.Connector; +import org.identityconnectors.framework.common.objects.Attribute; +import org.identityconnectors.framework.common.objects.AttributeDelta; +import org.identityconnectors.framework.common.objects.ConnectorObject; +import org.identityconnectors.framework.common.objects.ObjectClass; +import org.identityconnectors.framework.common.objects.ObjectClassInfo; +import org.identityconnectors.framework.common.objects.OperationOptions; +import org.identityconnectors.framework.common.objects.SearchResult; +import org.identityconnectors.framework.common.objects.SyncDelta; +import org.identityconnectors.framework.common.objects.SyncResultsHandler; +import org.identityconnectors.framework.common.objects.SyncToken; +import org.identityconnectors.framework.common.objects.Uid; +import org.identityconnectors.framework.common.objects.filter.Filter; +import org.identityconnectors.framework.spi.SearchResultsHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LiveSyncConnector implements Connector { + + public static final Logger LOG = LoggerFactory.getLogger(LiveSyncConnector.class); + + private final SyncDelta syncDelta; + + public LiveSyncConnector(final SyncDelta syncDelta) { + this.syncDelta = syncDelta; + } + + @Override + public Uid authenticate(final String username, final String password, final OperationOptions options) { + return null; + } + + @Override + public ConnInstance getConnInstance() { + return null; + } + + @Override + public Uid create( + final ObjectClass objectClass, + final Set attrs, + final OperationOptions options, + final AtomicReference propagationAttempted) { + return null; + } + + @Override + public Uid update(final ObjectClass objectClass, + final Uid uid, + final Set attrs, + final OperationOptions options, + final AtomicReference propagationAttempted) { + return null; + } + + @Override + public Set updateDelta( + final ObjectClass objectClass, + final Uid uid, + final Set modifications, + final OperationOptions options, + final AtomicReference propagationAttempted) { + return Set.of(); + } + + @Override + public void delete(final ObjectClass objectClass, + final Uid uid, + final OperationOptions options, + final AtomicReference propagationAttempted) { + + // nothing to do + } + + @Override + public void sync( + final ObjectClass objectClass, + final SyncToken token, + final SyncResultsHandler handler, + final OperationOptions options) { + handler.handle(this.syncDelta); + } + + @Override + public SyncToken getLatestSyncToken(final ObjectClass objectClass) { + throw new UnsupportedOperationException(); + } + + @Override + public ConnectorObject getObject( + final ObjectClass objectClass, + final Attribute connObjectKey, + final boolean ignoreCaseMatch, + final OperationOptions options) { + return null; + } + + @Override + public SearchResult search(final ObjectClass objectClass, + final Filter filter, + final SearchResultsHandler handler, + final OperationOptions options) { + return null; + } + + @Override + public Set getObjectClassInfo() { + return Set.of(); + } + + @Override + public void validate() { + //nothing to do + } + + @Override + public void test() { + //nothing to do + } + + @Override + public void dispose() { + //nothing to do + } + +} diff --git a/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/LiveSyncStreamPullJobDelegate.java b/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/LiveSyncStreamPullJobDelegate.java new file mode 100644 index 00000000000..f9f7c491a8d --- /dev/null +++ b/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/LiveSyncStreamPullJobDelegate.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.java.pushpull.stream; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.syncope.common.lib.to.Item; +import org.apache.syncope.common.lib.to.Mapping; +import org.apache.syncope.common.lib.to.Provision; +import org.apache.syncope.common.lib.to.ProvisioningReport; +import org.apache.syncope.common.lib.to.PullTaskTO; +import org.apache.syncope.common.lib.types.ConflictResolutionAction; +import org.apache.syncope.common.lib.types.IdMImplementationType; +import org.apache.syncope.common.lib.types.MappingPurpose; +import org.apache.syncope.common.lib.types.PullMode; +import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; +import org.apache.syncope.core.persistence.api.dao.RealmDAO; +import org.apache.syncope.core.persistence.api.entity.AnyType; +import org.apache.syncope.core.persistence.api.entity.AnyUtils; +import org.apache.syncope.core.persistence.api.entity.ExternalResource; +import org.apache.syncope.core.persistence.api.entity.Implementation; +import org.apache.syncope.core.persistence.api.entity.PlainSchema; +import org.apache.syncope.core.persistence.api.entity.Realm; +import org.apache.syncope.core.persistence.api.entity.VirSchema; +import org.apache.syncope.core.persistence.api.entity.policy.PullCorrelationRuleEntity; +import org.apache.syncope.core.persistence.api.entity.policy.PullPolicy; +import org.apache.syncope.core.persistence.api.entity.task.PullTask; +import org.apache.syncope.core.provisioning.api.Connector; +import org.apache.syncope.core.provisioning.api.pushpull.GroupPullResultHandler; +import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile; +import org.apache.syncope.core.provisioning.api.pushpull.PullActions; +import org.apache.syncope.core.provisioning.api.pushpull.SyncopePullResultHandler; +import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPullExecutor; +import org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate; +import org.apache.syncope.core.provisioning.java.utils.MappingUtils; +import org.apache.syncope.core.spring.security.SecureRandomUtils; +import org.identityconnectors.framework.common.objects.ObjectClass; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; + +@Transactional(readOnly = true) +public class LiveSyncStreamPullJobDelegate extends PullJobDelegate implements SyncopeStreamPullExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(LiveSyncConnector.class); + + @Autowired + private ImplementationDAO implementationDAO; + + @Autowired + private RealmDAO realmDAO; + + private PullPolicy pullPolicy( + final AnyType anyType, + final ConflictResolutionAction conflictResolutionAction, + final String pullCorrelationRule) { + + PullCorrelationRuleEntity pullCorrelationRuleEntity = null; + if (pullCorrelationRule != null) { + Implementation impl = implementationDAO.find(pullCorrelationRule); + if (impl == null || !IdMImplementationType.PULL_CORRELATION_RULE.equals(impl.getType())) { + LOG.debug("Invalid " + Implementation.class.getSimpleName() + " {}, ignoring...", pullCorrelationRule); + } else { + pullCorrelationRuleEntity = entityFactory.newEntity(PullCorrelationRuleEntity.class); + pullCorrelationRuleEntity.setAnyType(anyType); + pullCorrelationRuleEntity.setImplementation(impl); + } + } + PullPolicy pullPolicy = entityFactory.newEntity(PullPolicy.class); + pullPolicy.setConflictResolutionAction(conflictResolutionAction); + + if (pullCorrelationRuleEntity != null) { + pullPolicy.add(pullCorrelationRuleEntity); + pullCorrelationRuleEntity.setPullPolicy(pullPolicy); + } + return pullPolicy; + } + + private Provision provision( + final AnyType anyType, + final String keyColumn, + final List columns) throws JobExecutionException { + + Provision provision = new Provision(); + provision.setAnyType(anyType.getKey()); + provision.setObjectClass(ObjectClass.ACCOUNT.getObjectClassValue()); + Mapping mapping = new Mapping(); + provision.setMapping(mapping); + AnyUtils anyUtils = anyUtilsFactory.getInstance(anyType.getKind()); + if (anyUtils.getField(keyColumn) == null) { + PlainSchema keyColumnSchema = plainSchemaDAO.find(keyColumn); + if (keyColumnSchema == null) { + throw new JobExecutionException("Plain Schema for key column not found: " + keyColumn); + } + } + Item connObjectKeyItem = new Item(); + connObjectKeyItem.setConnObjectKey(true); + connObjectKeyItem.setExtAttrName(keyColumn); + connObjectKeyItem.setIntAttrName(keyColumn); + connObjectKeyItem.setPurpose(MappingPurpose.PULL); + mapping.setConnObjectKeyItem(connObjectKeyItem); + columns.stream(). + filter(column -> anyUtils.getField(column) != null + || plainSchemaDAO.find(column) != null || virSchemaDAO.find(column) != null). + map(column -> { + Item item = new Item(); + item.setExtAttrName(column); + item.setIntAttrName(column); + item.setPurpose(MappingPurpose.PULL); + mapping.add(item); + return item; + }).forEach(mapping::add); + return provision; + } + + private ExternalResource externalResource( + final AnyType anyType, + final String keyColumn, + final List columns, + final ConflictResolutionAction conflictResolutionAction, + final String pullCorrelationRule) throws JobExecutionException { + + Provision provision = provision(anyType, keyColumn, columns); + ExternalResource resource = entityFactory.newEntity(ExternalResource.class); + resource.setKey("StreamPull_" + SecureRandomUtils.generateRandomUUID().toString()); + resource.getProvisions().add(provision); + resource.setPullPolicy(pullPolicy(anyType, conflictResolutionAction, pullCorrelationRule)); + return resource; + } + + @Override + public List pull( + final AnyType anyType, + final String keyColumn, + final List columns, + final ConflictResolutionAction conflictResolutionAction, + final String pullCorrelationRule, + final Connector connector, + final PullTaskTO pullTaskTO) throws JobExecutionException { + + try { + ExternalResource resource = + externalResource(anyType, keyColumn, columns, conflictResolutionAction, + pullCorrelationRule); + LOG.debug("this is my fake external resource {}", resource); + Provision provision = resource.getProvisions().get(0); + PullTask pullTask = entityFactory.newEntity(PullTask.class); + pullTask.setResource(resource); + pullTask.setMatchingRule(pullTaskTO.getMatchingRule()); + pullTask.setUnmatchingRule(pullTaskTO.getUnmatchingRule()); + pullTask.setPullMode(PullMode.FULL_RECONCILIATION); + pullTask.setPerformCreate(true); + pullTask.setPerformUpdate(true); + pullTask.setPerformDelete(true); + pullTask.setSyncStatus(false); + Realm realm = realmDAO.findByFullPath(pullTaskTO.getDestinationRealm()); + pullTask.setDestinationRealm(realm); + pullTask.setRemediation(pullTaskTO.isRemediation()); + profile = new ProvisioningProfile<>(connector, pullTask); + profile.setDryRun(false); + profile.setConflictResolutionAction(conflictResolutionAction); + profile.getActions().addAll(getPullActions(pullTaskTO.getActions().stream(). + map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList()))); + for (PullActions action : profile.getActions()) { + action.beforeAll(profile); + } + SyncopePullResultHandler handler; + GroupPullResultHandler ghandler = buildGroupHandler(); + switch (anyType.getKind()) { + case USER: + handler = buildUserHandler(); + break; + + case GROUP: + handler = ghandler; + break; + + case ANY_OBJECT: + default: + handler = buildAnyObjectHandler(); + } + handler.setProfile(profile); + handler.setPullExecutor(this); + // execute filtered pull + Set moreAttrsToGet = new HashSet<>(); + profile.getActions().forEach(a -> moreAttrsToGet.addAll(a.moreAttrsToGet(profile, provision))); + Stream mapItems = Stream.concat( + MappingUtils.getPullItems(provision.getMapping().getItems().stream()), + virSchemaDAO.find(resource.getKey(), anyType.getKey()).stream(). + map(VirSchema::asLinkingMappingItem)); + connector.sync(null, null, handler, null); + try { + setGroupOwners(ghandler); + } catch (Exception e) { + LOG.error("While setting group owners", e); + } + for (PullActions action : profile.getActions()) { + action.afterAll(profile); + } + return profile.getResults(); + } catch (Exception e) { + throw e instanceof JobExecutionException + ? (JobExecutionException) e + : new JobExecutionException("While stream pulling", e); + } + } +} diff --git a/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/livesync/KafkaProvisioningListener.java b/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/livesync/KafkaProvisioningListener.java index f57a071ee5d..44ed64b1bcf 100644 --- a/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/livesync/KafkaProvisioningListener.java +++ b/ext/livesync/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/livesync/KafkaProvisioningListener.java @@ -18,17 +18,37 @@ */ package org.apache.syncope.core.provisioning.livesync; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.syncope.common.lib.SyncopeConstants; +import org.apache.syncope.common.lib.to.ProvisioningReport; +import org.apache.syncope.common.lib.to.PullTaskTO; +import org.apache.syncope.common.lib.types.AnyTypeKind; +import org.apache.syncope.common.lib.types.ConflictResolutionAction; +import org.apache.syncope.common.lib.types.MatchingRule; +import org.apache.syncope.common.lib.types.UnmatchingRule; +import org.apache.syncope.core.persistence.api.dao.AnyTypeDAO; +import org.apache.syncope.core.persistence.api.dao.NotFoundException; +import org.apache.syncope.core.persistence.api.entity.AnyType; +import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPullExecutor;; +import org.apache.syncope.core.provisioning.java.pushpull.stream.LiveSyncConnector; +import org.apache.syncope.core.provisioning.java.pushpull.stream.LiveSyncStreamPullJobDelegate; +import org.apache.syncope.core.spring.ApplicationContextProvider; +import org.apache.syncope.core.spring.security.AuthContextUtils; import org.identityconnectors.framework.common.objects.AttributeBuilder; import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder; import org.identityconnectors.framework.common.objects.Name; import org.identityconnectors.framework.common.objects.ObjectClass; +import org.identityconnectors.framework.common.objects.SyncDelta; import org.identityconnectors.framework.common.objects.SyncDeltaBuilder; import org.identityconnectors.framework.common.objects.SyncDeltaType; import org.identityconnectors.framework.common.objects.SyncToken; import org.identityconnectors.framework.common.objects.Uid; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; @@ -36,15 +56,22 @@ public class KafkaProvisioningListener { - private static final Logger LOG = LoggerFactory.getLogger(KafkaProvisioningListener.class); + @Autowired + private AnyTypeDAO anyTypeDAO; - private final SyncDeltaBuilder syncDeltaBuilder = new SyncDeltaBuilder(); + private static final Logger LOG = LoggerFactory.getLogger(KafkaProvisioningListener.class); - @KafkaListener(id = "provisioningRegex", topicPattern = "dbserver1.inventory.*") + @KafkaListener(id = "provisioningRegex", topics = "dbserver1.inventory.utenti") public void pollTable(final @Payload DebeziumMessage payload, final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Map primaryKey) { + if ("r".equals(payload.getOp())) { + //skip debezium read message + return; + } + SyncDelta delta; ConnectorObjectBuilder connectorBuilder = new ConnectorObjectBuilder().setObjectClass(ObjectClass.ACCOUNT); + SyncDeltaBuilder syncDeltaBuilder = new SyncDeltaBuilder(); final String uidValue = primaryKey.entrySet().iterator().next().getValue().toString(); final Uid uid = new Uid(uidValue); final Name name = new Name(uidValue); @@ -55,17 +82,49 @@ public void pollTable(final @Payload DebeziumMessage payload, syncDeltaBuilder.setToken(token); if ("d".equals(payload.getOp())) { syncDeltaBuilder.setDeltaType(SyncDeltaType.DELETE); - LOG.debug("This is my syncDeltaBuilder {}", - syncDeltaBuilder.build()); + payload.getBefore().forEach((k, v) -> connectorBuilder.addAttribute(AttributeBuilder.build(k, v))); + delta = syncDeltaBuilder.setObject(connectorBuilder.build()).build(); + LOG.debug("This is my syncDelta {}", delta); + makeSync(delta, primaryKey.entrySet().iterator().next().getKey(), + payload.getBefore().keySet().stream().collect(Collectors.toList())); return; } + syncDeltaBuilder.setDeltaType(SyncDeltaType.CREATE_OR_UPDATE); payload.getAfter().forEach((k, v) -> connectorBuilder.addAttribute(AttributeBuilder.build(k, v))); - if ("c".equals(payload.getOp()) || "r".equals(payload.getOp())) { - syncDeltaBuilder.setDeltaType(SyncDeltaType.CREATE); - } else if ("u".equals(payload.getOp())) { - syncDeltaBuilder.setDeltaType(SyncDeltaType.UPDATE); + delta = syncDeltaBuilder.setObject(connectorBuilder.build()).build(); + LOG.debug("This is my syncDelta {}", delta); + makeSync(delta, primaryKey.entrySet().iterator().next().getKey(), + payload.getAfter().keySet().stream().collect(Collectors.toList())); + } + + private List makeSync(final SyncDelta delta, + final String idName, + final List valueName) { + + AnyType anyType = anyTypeDAO.find(AnyTypeKind.USER.name()); + LOG.debug("Any type user find {}", anyType); + if (anyType == null) { + throw new NotFoundException("AnyType '" + AnyTypeKind.USER + "'"); } - LOG.debug("This is my syncDeltaBuilder {}", - syncDeltaBuilder.setObject(connectorBuilder.build()).build()); + PullTaskTO pullTask = new PullTaskTO(); + pullTask.setDestinationRealm(SyncopeConstants.ROOT_REALM); + pullTask.setRemediation(false); + pullTask.setMatchingRule(MatchingRule.UPDATE); + pullTask.setUnmatchingRule(UnmatchingRule.PROVISION); + LiveSyncConnector connector = new LiveSyncConnector(delta); + SyncopeStreamPullExecutor executor = + (SyncopeStreamPullExecutor) ApplicationContextProvider.getBeanFactory(). + createBean(LiveSyncStreamPullJobDelegate.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, + false); + + return AuthContextUtils.callAsAdmin(SyncopeConstants.MASTER_DOMAIN, + () -> executor.pull( + anyType, + idName, + valueName, + ConflictResolutionAction.IGNORE, + null, + connector, + pullTask)); } } diff --git a/fit/core-reference/pom.xml b/fit/core-reference/pom.xml index 891f8e3b64e..1beda78ad50 100644 --- a/fit/core-reference/pom.xml +++ b/fit/core-reference/pom.xml @@ -758,6 +758,10 @@ under the License. my_connect_configs my_connect_offsets my_connect_statuses + org.apache.kafka.connect.json.JsonConverter + false + org.apache.kafka.connect.json.JsonConverter + false %a diff --git a/fit/core-reference/src/test/resources/livesync/mysqlInit.sql b/fit/core-reference/src/test/resources/livesync/mysqlInit.sql index 4b51e8e7f8d..a8577a28ec6 100755 --- a/fit/core-reference/src/test/resources/livesync/mysqlInit.sql +++ b/fit/core-reference/src/test/resources/livesync/mysqlInit.sql @@ -127,3 +127,17 @@ INSERT INTO geom VALUES(default, ST_GeomFromText('POINT(1 1)'), NULL), (default, ST_GeomFromText('LINESTRING(2 1, 6 6)'), NULL), (default, ST_GeomFromText('POLYGON((0 5, 2 5, 2 7, 0 7, 0 5))'), NULL); + +# Create test table user +CREATE TABLE utenti ( + username VARCHAR(255) NOT NULL PRIMARY KEY, + firstname VARCHAR(255) NOT NULL, + lastname VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE KEY, + surname VARCHAR(255) NOT NULL, + fullname VARCHAR(255) NOT NULL UNIQUE KEY, + userId VARCHAR(255) NOT NULL UNIQUE KEY); + +INSERT INTO utenti +VALUES('mariorossi','rossi' , 'rossi','mario.rossi@apache.it','rossi', 'mario rossi' ,'mario.rossi@apache.it'), + ('peppino22','peppino' , 'bianchi', 'peppino@apache.it','peppino', 'peppino bianchi','peppino@apache.it'); \ No newline at end of file