Skip to content

Commit

Permalink
Move OPC-UA processor and sinks to new module streampipes-connectors-…
Browse files Browse the repository at this point in the history
…opcua (#1648) (#1649)
  • Loading branch information
dominikriemer authored Jun 6, 2023
1 parent acfe67a commit 76afaef
Show file tree
Hide file tree
Showing 26 changed files with 141 additions and 63 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,7 @@
<module>streampipes-rest-extensions</module>
<module>streampipes-extensions/streampipes-connectors-influx</module>
<module>streampipes-client-api</module>
<module>streampipes-extensions/streampipes-connectors-opcua</module>
</modules>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

<dependencies>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-opcua</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-pipeline-elements-shared</artifactId>
Expand Down Expand Up @@ -155,10 +160,6 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-sse</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
Expand Down
65 changes: 65 additions & 0 deletions streampipes-extensions/streampipes-connectors-opcua/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-parent</artifactId>
<version>0.93.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>streampipes-connectors-opcua</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-extensions-api</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-extensions-management</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-model</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-sdk</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua;
package org.apache.streampipes.extensions.connectors.opcua.adapter;

import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfig;
import org.apache.streampipes.extensions.connectors.opcua.adapter.configuration.SpOpcUaConfig;

import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua;
package org.apache.streampipes.extensions.connectors.opcua.adapter;

import org.apache.streampipes.sdk.utils.Datatypes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua;
package org.apache.streampipes.extensions.connectors.opcua.adapter;

import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfigBuilder;
import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil;
import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.api.connect.IEventCollector;
import org.apache.streampipes.extensions.api.connect.IPullAdapter;
Expand All @@ -31,6 +29,8 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.opcua.adapter.configuration.SpOpcUaConfigBuilder;
import org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil;
import org.apache.streampipes.extensions.management.connect.PullAdapterScheduler;
import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;
import org.apache.streampipes.model.AdapterType;
Expand Down Expand Up @@ -62,17 +62,17 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.AVAILABLE_NODES;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.NAMESPACE_INDEX;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.NODE_ID;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.OPC_HOST;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.OPC_HOST_OR_URL;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.OPC_SERVER_HOST;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.OPC_SERVER_PORT;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.OPC_SERVER_URL;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels.OPC_URL;
import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.getSchema;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.AVAILABLE_NODES;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.NAMESPACE_INDEX;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.NODE_ID;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.OPC_HOST;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.OPC_HOST_OR_URL;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.OPC_SERVER_HOST;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.OPC_SERVER_PORT;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.OPC_SERVER_URL;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.OpcUaLabels.OPC_URL;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil.getSchema;

public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter, SupportsRuntimeConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua;
package org.apache.streampipes.extensions.connectors.opcua.adapter;

import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfig;
import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaTypes;
import org.apache.streampipes.extensions.connectors.opcua.adapter.configuration.SpOpcUaConfig;
import org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaTypes;
import org.apache.streampipes.model.staticproperty.TreeInputNode;

import org.eclipse.milo.opcua.sdk.client.AddressSpace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua;
package org.apache.streampipes.extensions.connectors.opcua.adapter;


import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfig;
import org.apache.streampipes.extensions.connectors.opcua.adapter.configuration.SpOpcUaConfig;

import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua.configuration;
package org.apache.streampipes.extensions.connectors.opcua.adapter.configuration;

import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua.configuration;
package org.apache.streampipes.extensions.connectors.opcua.adapter.configuration;

import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.connectors.opcua.adapter.utils.OpcUaUtil;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua.utils;
package org.apache.streampipes.extensions.connectors.opcua.adapter.utils;

import org.eclipse.milo.opcua.stack.core.UaException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua.utils;
package org.apache.streampipes.extensions.connectors.opcua.adapter.utils;


import jakarta.annotation.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua.utils;
package org.apache.streampipes.extensions.connectors.opcua.adapter.utils;

import org.apache.streampipes.sdk.utils.Datatypes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
*
*/

package org.apache.streampipes.connect.iiot.adapters.opcua.utils;
package org.apache.streampipes.extensions.connectors.opcua.adapter.utils;

import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.exceptions.connect.ParseException;
import org.apache.streampipes.connect.iiot.adapters.opcua.OpcNode;
import org.apache.streampipes.connect.iiot.adapters.opcua.OpcUaNodeBrowser;
import org.apache.streampipes.connect.iiot.adapters.opcua.SpOpcUaClient;
import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfigBuilder;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcNode;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaNodeBrowser;
import org.apache.streampipes.extensions.connectors.opcua.adapter.SpOpcUaClient;
import org.apache.streampipes.extensions.connectors.opcua.adapter.configuration.SpOpcUaConfigBuilder;
import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.sinks.databases.jvm.opcua;
package org.apache.streampipes.extensions.connectors.opcua.sink;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.runtime.Event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.sinks.databases.jvm.opcua;
package org.apache.streampipes.extensions.connectors.opcua.sink;

public final class OpcUaParameters {
private final String hostname;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,25 @@
*
*/

package org.apache.streampipes.sinks.databases.jvm.opcua;
package org.apache.streampipes.extensions.connectors.opcua.sink;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;

public class OpcUaSink extends StreamPipesDataSink {
public class OpcUaSink implements IStreamPipesDataSink {

private static final String OPC_SERVER_KEY = "opc_host";
private static final String OPC_PORT_KEY = "opc_port";
Expand All @@ -44,26 +45,29 @@ public class OpcUaSink extends StreamPipesDataSink {
private OpcUa opcUa;

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua")
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.FORWARD)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
Labels.withId(MAPPING_PROPERTY_KEY),
PropertyScope.NONE).build())
.requiredTextParameter(Labels.withId(OPC_SERVER_KEY))
.requiredIntegerParameter(Labels.withId(OPC_PORT_KEY))
.requiredIntegerParameter(Labels.withId(OPC_NAMESPACE_INDEX_KEY))
.requiredTextParameter(Labels.withId(OPC_NODE_ID_KEY))
.build();
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
OpcUaSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua")
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.FORWARD)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
Labels.withId(MAPPING_PROPERTY_KEY),
PropertyScope.NONE).build())
.requiredTextParameter(Labels.withId(OPC_SERVER_KEY))
.requiredIntegerParameter(Labels.withId(OPC_PORT_KEY))
.requiredIntegerParameter(Labels.withId(OPC_NAMESPACE_INDEX_KEY))
.requiredTextParameter(Labels.withId(OPC_NODE_ID_KEY))
.build()
);
}

@Override
public void onInvocation(SinkParams parameters,
EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
String hostname = extractor.singleValueParameter(OPC_SERVER_KEY, String.class);
Integer port = extractor.singleValueParameter(OPC_PORT_KEY, Integer.class);
Expand Down Expand Up @@ -93,7 +97,7 @@ public void onEvent(Event event) throws SpRuntimeException {
}

@Override
public void onDetach() throws SpRuntimeException {
public void onPipelineStopped() {
this.opcUa.onDetach();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

<dependencies>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-opcua</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-sdk-bundle</artifactId>
Expand Down
Loading

0 comments on commit 76afaef

Please # to comment.