Skip to content

Commit dbae86d

Browse files
committed
feat: remove versioning from flow restore process
1 parent 4c12573 commit dbae86d

File tree

4 files changed

+103
-228
lines changed

4 files changed

+103
-228
lines changed

demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml

+25-59
Original file line numberDiff line numberDiff line change
@@ -66,70 +66,36 @@ data:
6666
service_login(username=USERNAME, password=PASSWORD)
6767
print("Logged in")
6868
69-
organization = "stackabletech"
70-
repository = "demos"
71-
branch = "main"
72-
version = "main"
73-
directory = "demos/data-lakehouse-iceberg-trino-spark"
74-
flow_name = "LakehouseKafkaIngest"
69+
response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.json")
7570
76-
# Check if the GitHub flow registry client already exists
77-
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
71+
filename = "/tmp/LakehouseKafkaIngest.json"
72+
with open(filename, "wb") as f:
73+
f.write(response.content)
7874
79-
github_client = None
80-
for client in flow_registry_clients:
81-
if client.component.name == "GitHubFlowRegistryClient":
82-
github_client = client
83-
print("Found existing GitHub flow registry client")
84-
break
75+
pg_id = get_root_pg_id()
8576
86-
if not github_client:
87-
print("Creating new GitHub flow registry client")
88-
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
89-
body={
90-
"revision": {"version": 0},
91-
"component": {
92-
"name": "GitHubFlowRegistryClient",
93-
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
94-
"properties": {
95-
"Repository Owner": organization,
96-
"Repository Name": repository,
97-
},
98-
"bundle": {
99-
"group": "org.apache.nifi",
100-
"artifact": "nifi-github-nar",
101-
"version": "2.2.0",
102-
},
103-
},
104-
}
105-
)
77+
if not nipyapi.config.nifi_config.api_client:
78+
nipyapi.config.nifi_config.api_client = ApiClient()
10679
107-
pg_id = get_root_pg_id()
80+
header_params = {}
81+
header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
82+
header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
10883
109-
try:
110-
# Create process group from the file in the Git repo
111-
nipyapi.nifi.ProcessGroupsApi().create_process_group(
112-
id=pg_id,
113-
body={
114-
"revision": {"version": 0},
115-
"component": {
116-
"position": {"x": 300, "y": 10},
117-
"versionControlInformation": {
118-
"registryId": github_client.component.id,
119-
"flowId": flow_name,
120-
"bucketId": directory,
121-
"branch": branch,
122-
"version": version,
123-
},
124-
},
125-
},
126-
)
127-
except ValueError as e:
128-
# Ignore, because nipyapi can't handle non-int versions yet
129-
if "invalid literal for int() with base 10" in str(e):
130-
print("Ignoring ValueError")
131-
else:
132-
raise e
84+
nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
85+
path_params={'pg_id': pg_id},
86+
header_params=header_params,
87+
_return_http_data_only=True,
88+
post_params=[
89+
('id', pg_id),
90+
('groupName', 'LakehouseKafkaIngest'),
91+
('positionX', 100),
92+
('positionY', 10),
93+
('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
94+
],
95+
files={
96+
'file': filename
97+
},
98+
auth_settings=['tokenAuth'])
13399
134100
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
135101
# To work around this, we try to schedule the controllers multiple times

demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml

+26-60
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ data:
5959
from nipyapi.security import service_login
6060
import nipyapi
6161
import os
62+
import requests
6263
import urllib3
6364
6465
# As of 2022-08-29 we cant use "https://nifi:8443" here because <h2>The request contained an invalid host header [<code>nifi:8443</code>] in the request [<code>/nifi-api</code>]. Check for request manipulation or third-party intercept.</h2>
@@ -75,70 +76,35 @@ data:
7576
service_login(username=USERNAME, password=PASSWORD)
7677
print("Logged in")
7778
78-
organization = "stackabletech"
79-
repository = "demos"
80-
branch = "main"
81-
version = "main"
82-
directory = "demos/nifi-kafka-druid-earthquake-data"
83-
flow_name = "IngestEarthquakesToKafka"
79+
response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-earthquake-data/IngestEarthquakesToKafka.json")
80+
filename = "/tmp/IngestEarthquakesToKafka.json"
81+
with open(filename, "wb") as f:
82+
f.write(response.content)
8483
85-
# Check if the GitHub flow registry client already exists
86-
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
87-
88-
github_client = None
89-
for client in flow_registry_clients:
90-
if client.component.name == "GitHubFlowRegistryClient":
91-
github_client = client
92-
print("Found existing GitHub flow registry client")
93-
break
84+
pg_id = get_root_pg_id()
9485
95-
if not github_client:
96-
print("Creating new GitHub flow registry client")
97-
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
98-
body={
99-
"revision": {"version": 0},
100-
"component": {
101-
"name": "GitHubFlowRegistryClient",
102-
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
103-
"properties": {
104-
"Repository Owner": organization,
105-
"Repository Name": repository,
106-
},
107-
"bundle": {
108-
"group": "org.apache.nifi",
109-
"artifact": "nifi-github-nar",
110-
"version": "2.2.0",
111-
},
112-
},
113-
}
114-
)
86+
if not nipyapi.config.nifi_config.api_client:
87+
nipyapi.config.nifi_config.api_client = ApiClient()
11588
116-
pg_id = get_root_pg_id()
89+
header_params = {}
90+
header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
91+
header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
11792
118-
try:
119-
# Create process group from the file in the Git repo
120-
nipyapi.nifi.ProcessGroupsApi().create_process_group(
121-
id=pg_id,
122-
body={
123-
"revision": {"version": 0},
124-
"component": {
125-
"position": {"x": 300, "y": 10},
126-
"versionControlInformation": {
127-
"registryId": github_client.component.id,
128-
"flowId": flow_name,
129-
"bucketId": directory,
130-
"branch": branch,
131-
"version": version,
132-
},
133-
},
134-
},
135-
)
136-
except ValueError as e:
137-
# Ignore, because nipyapi can't handle non-int versions yet
138-
if "invalid literal for int() with base 10" in str(e):
139-
print("Ignoring ValueError")
140-
else:
141-
raise e
93+
nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
94+
path_params={'pg_id': pg_id},
95+
header_params=header_params,
96+
_return_http_data_only=True,
97+
post_params=[
98+
('id', pg_id),
99+
('groupName', 'IngestEarthquakesToKafka'),
100+
('positionX', 100),
101+
('positionY', 10),
102+
('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
103+
],
104+
files={
105+
'file': filename
106+
},
107+
auth_settings=['tokenAuth'])
142108
143109
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
144110
# To work around this, we try to schedule the controllers multiple times

demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml

+26-59
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ data:
5959
from nipyapi.security import service_login
6060
import nipyapi
6161
import os
62+
import requests
6263
import urllib3
6364
6465
# As of 2022-08-29 we cant use "https://nifi:8443" here because <h2>The request contained an invalid host header [<code>nifi:8443</code>] in the request [<code>/nifi-api</code>]. Check for request manipulation or third-party intercept.</h2>
@@ -75,70 +76,36 @@ data:
7576
service_login(username=USERNAME, password=PASSWORD)
7677
print("Logged in")
7778
78-
organization = "stackabletech"
79-
repository = "demos"
80-
branch = "main"
81-
version = "main"
82-
directory = "demos/nifi-kafka-druid-water-level-data"
83-
flow_name = "IngestWaterLevelsToKafka"
79+
response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-water-level-data/IngestWaterLevelsToKafka.json")
8480
85-
# Check if the GitHub flow registry client already exists
86-
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
81+
filename = "/tmp/IngestWaterLevelsToKafka.json"
82+
with open(filename, "wb") as f:
83+
f.write(response.content)
8784
88-
github_client = None
89-
for client in flow_registry_clients:
90-
if client.component.name == "GitHubFlowRegistryClient":
91-
github_client = client
92-
print("Found existing GitHub flow registry client")
93-
break
85+
pg_id = get_root_pg_id()
9486
95-
if not github_client:
96-
print("Creating new GitHub flow registry client")
97-
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
98-
body={
99-
"revision": {"version": 0},
100-
"component": {
101-
"name": "GitHubFlowRegistryClient",
102-
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
103-
"properties": {
104-
"Repository Owner": organization,
105-
"Repository Name": repository,
106-
},
107-
"bundle": {
108-
"group": "org.apache.nifi",
109-
"artifact": "nifi-github-nar",
110-
"version": "2.2.0",
111-
},
112-
},
113-
}
114-
)
87+
if not nipyapi.config.nifi_config.api_client:
88+
nipyapi.config.nifi_config.api_client = ApiClient()
11589
116-
pg_id = get_root_pg_id()
90+
header_params = {}
91+
header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
92+
header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
11793
118-
try:
119-
# Create process group from the file in the Git repo
120-
nipyapi.nifi.ProcessGroupsApi().create_process_group(
121-
id=pg_id,
122-
body={
123-
"revision": {"version": 0},
124-
"component": {
125-
"position": {"x": 300, "y": 10},
126-
"versionControlInformation": {
127-
"registryId": github_client.component.id,
128-
"flowId": flow_name,
129-
"bucketId": directory,
130-
"branch": branch,
131-
"version": version,
132-
},
133-
},
134-
},
135-
)
136-
except ValueError as e:
137-
# Ignore, because nipyapi can't handle non-int versions yet
138-
if "invalid literal for int() with base 10" in str(e):
139-
print("Ignoring ValueError")
140-
else:
141-
raise e
94+
nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
95+
path_params={'pg_id': pg_id},
96+
header_params=header_params,
97+
_return_http_data_only=True,
98+
post_params=[
99+
('id', pg_id),
100+
('groupName', 'IngestWaterLevelsToKafka'),
101+
('positionX', 100),
102+
('positionY', 10),
103+
('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
104+
],
105+
files={
106+
'file': filename
107+
},
108+
auth_settings=['tokenAuth'])
142109
143110
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
144111
# To work around this, we try to schedule the controllers multiple times

demos/signal-processing/create-nifi-ingestion-job.yaml

+26-50
Original file line numberDiff line numberDiff line change
@@ -86,60 +86,36 @@ data:
8686
service_login(username=USERNAME, password=PASSWORD)
8787
print("Logged in")
8888
89-
organization = "stackabletech"
90-
repository = "demos"
91-
branch = "main"
92-
version = "main"
93-
directory = "demos/signal-processing"
94-
flow_name = "DownloadAndWriteToDB"
89+
response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/signal-processing/DownloadAndWriteToDB.json")
9590
96-
# Register the flow registry client
97-
response = nipyapi.nifi.ControllerApi().create_flow_registry_client(
98-
body={
99-
"revision": {"version": 0},
100-
"component": {
101-
"name": "GitHubFlowRegistryClient",
102-
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
103-
"properties": {
104-
"Repository Owner": organization,
105-
"Repository Name": repository,
106-
},
107-
"bundle": {
108-
"group": "org.apache.nifi",
109-
"artifact": "nifi-github-nar",
110-
"version": "2.2.0",
111-
},
112-
},
113-
}
114-
)
91+
filename = "/tmp/DownloadAndWriteToDB.json"
92+
with open(filename, "wb") as f:
93+
f.write(response.content)
11594
11695
pg_id = get_root_pg_id()
117-
print(f"pgid={pg_id}")
11896
119-
try:
120-
# Create process group from the file in the Git repo
121-
nipyapi.nifi.ProcessGroupsApi().create_process_group(
122-
id=pg_id,
123-
body={
124-
"revision": {"version": 0},
125-
"component": {
126-
"position": {"x": 300, "y": 10},
127-
"versionControlInformation": {
128-
"registryId": response.component.id,
129-
"flowId": flow_name,
130-
"bucketId": directory,
131-
"branch": branch,
132-
"version": version,
133-
},
134-
},
135-
},
136-
)
137-
except ValueError as e:
138-
# Ignore, because nipyapi can't handle non-int versions yet
139-
if "invalid literal for int() with base 10" in str(e):
140-
print("Ignoring ValueError")
141-
else:
142-
raise e
97+
if not nipyapi.config.nifi_config.api_client:
98+
nipyapi.config.nifi_config.api_client = ApiClient()
99+
100+
header_params = {}
101+
header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
102+
header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
103+
104+
nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
105+
path_params={'pg_id': pg_id},
106+
header_params=header_params,
107+
_return_http_data_only=True,
108+
post_params=[
109+
('id', pg_id),
110+
('groupName', 'DownloadAndWriteToDB'),
111+
('positionX', 100),
112+
('positionY', 10),
113+
('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
114+
],
115+
files={
116+
'file': filename
117+
},
118+
auth_settings=['tokenAuth'])
143119
144120
# Update the controller services with the correct password
145121
for controller in list_all_controllers(pg_id):

0 commit comments

Comments
 (0)