diff --git a/knime_extension/geospatial_env.yml b/knime_extension/geospatial_env.yml index a549339f..808036d5 100644 --- a/knime_extension/geospatial_env.yml +++ b/knime_extension/geospatial_env.yml @@ -24,6 +24,7 @@ dependencies: - numpy=1.23.5 #required to fix problem with latest version of numpy - osmnx=1.7.0 - polyline 2.0.0 + - pyogrio=0.10.0 - pyproj=3.5.0 - pysal=23.1 - rasterio=1.3.6 diff --git a/knime_extension/src/nodes/io.py b/knime_extension/src/nodes/io.py index 755d5c9f..eb2fd35d 100644 --- a/knime_extension/src/nodes/io.py +++ b/knime_extension/src/nodes/io.py @@ -46,6 +46,64 @@ class ExistingFile(knext.EnumParameterOptions): ) +def validate_path(path: str) -> None: + # no path check + pass + + +def clean_dataframe(df): + df = df.reset_index(drop=True) + columns_to_drop = ["", ""] + return df.drop(columns=[col for col in columns_to_drop if col in df.columns]) + + +def check_overwrite(fileurl, existing_file): + import os + + if existing_file == ExistingFile.FAIL.name: + if os.path.exists(fileurl): + raise knext.InvalidParametersError("File already exists.") + + +class _EncodingOptions(knext.EnumParameterOptions): + AUTO = ( + "Auto", + "Automatically detect the encoding from common options", + ) + UTF8 = ( + "UTF-8", + "Unicode Transformation Format - 8 bit. Default encoding suitable for most modern GIS data files.", + ) + GB18030 = ( + "GB18030", + "Chinese National Standard encoding. More comprehensive than GBK.", + ) + GBK = ( + "GBK", + "Chinese internal code specification. Common in Chinese GIS software.", + ) + GB2312 = ( + "GB2312", + "Basic Simplified Chinese character encoding.", + ) + LATIN1 = ( + "ISO-8859-1", + "Latin-1 encoding. Suitable for Western European languages.", + ) + WINDOWS1252 = ( + "Windows-1252", + "Windows Western European encoding. Common in Windows systems.", + ) + ASCII = ( + "ASCII", + "Basic ASCII encoding. Only for standard ASCII characters.", + ) + + @classmethod + def get_default(cls): + return cls.AUTO + + ############################################ # GeoFile Reader ############################################ @@ -87,10 +145,20 @@ class ExistingFile(knext.EnumParameterOptions): }, ) class GeoFileReaderNode: - data_url = knext.StringParameter( + data_url = knext.LocalPathParameter( "Input file path", - "The file path for reading data.", - "", + "Select the file path for reading data.", + placeholder_text="Select input file path...", + validator=validate_path, + ) + + encoding = knext.EnumParameter( + label="Encoding", + description="Select the encoding for reading the data file.", + default_value=_EncodingOptions.get_default().name, + enum=_EncodingOptions, + since_version="1.3.0", + is_advanced=True, ) def configure(self, configure_context): @@ -131,19 +199,27 @@ def execute(self, exec_context: knext.ExecutionContext): or self.data_url.lower().endswith(".parquet.snappy") ): gdf = gp.read_parquet(self.data_url) + else: - gdf = gp.read_file(self.data_url) + if self.encoding == _EncodingOptions.AUTO.name: + gdf = gp.read_file(self.data_url, engine="pyogrio", on_invalid="ignore") + else: + gdf = gp.read_file( + self.data_url, + encoding=self.encoding, + engine="pyogrio", + on_invalid="ignore", + ) - if "" in gdf.columns: - gdf = gdf.drop(columns="") - if "" in gdf.columns: - gdf = gdf.drop(columns="") + gdf = clean_dataframe(gdf) return knext.Table.from_pandas(gdf) ############################################ # GeoFile Writer ############################################ + + @knext.node( name="GeoFile Writer", node_type=knext.NodeType.SINK, @@ -171,6 +247,7 @@ def execute(self, exec_context: knext.ExecutionContext): }, ) class GeoFileWriterNode: + geo_col = knext.ColumnParameter( "Geometry column", "Select the geometry column for Geodata.", @@ -180,11 +257,11 @@ class GeoFileWriterNode: include_none_column=False, ) - data_url = knext.StringParameter( + data_url = knext.LocalPathParameter( "Output file path", - """The file path for writing data. The file extension e.g. *.shp*, *.geojson*, or *.parquet* is appended -automatically depending on the selected file format if not specified.""", - "", + "Select the file path for saving data.", + placeholder_text="Select output file path...", + validator=validate_path, ) existing_file = knext.EnumParameter( @@ -203,7 +280,7 @@ class GeoFileWriterNode: "Output file format", "The file format to use.", "Shapefile", - enum=["Shapefile", "GeoJSON", "GeoParquet"], + enum=["Shapefile", "GeoJSON", "GeoParquet", "GML"], ) parquet_compression = knext.EnumParameter( @@ -214,6 +291,15 @@ class GeoFileWriterNode: since_version="1.2.0", ).rule(knext.OneOf(dataformat, ["GeoParquet"]), knext.Effect.SHOW) + encoding = knext.EnumParameter( + label="Encoding", + description="Select the encoding for reading the data file.", + default_value=_EncodingOptions.get_default().name, + enum=_EncodingOptions, + since_version="1.3.0", + is_advanced=True, + ) + def configure(self, configure_context, input_schema): self.geo_col = knut.column_exists_or_preset( configure_context, self.geo_col, input_schema, knut.is_geo @@ -225,6 +311,12 @@ def execute(self, exec_context: knext.ExecutionContext, input_1): 0.4, "Writing file (This might take a while without progress changes)" ) + import os + + output_dir = os.path.dirname(self.data_url) + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + gdf = gp.GeoDataFrame(input_1.to_pandas(), geometry=self.geo_col) if "" in gdf.columns: gdf = gdf.drop(columns="") @@ -232,8 +324,12 @@ def execute(self, exec_context: knext.ExecutionContext, input_1): gdf = gdf.drop(columns="") if self.dataformat == "Shapefile": fileurl = knut.ensure_file_extension(self.data_url, ".shp") - self.__check_overwrite(fileurl) - gdf.to_file(fileurl) + check_overwrite(fileurl, self.existing_file) + if self.encoding == _EncodingOptions.AUTO.name: + gdf.to_file(fileurl) + else: + gdf.to_file(fileurl, encoding=self.encoding) + elif self.dataformat == "GeoParquet": if self.parquet_compression == Compression.NONE.name: file_extension = ".parquet" @@ -248,23 +344,24 @@ def execute(self, exec_context: knext.ExecutionContext, input_1): file_extension = ".parquet.snappy" compression = "snappy" fileurl = knut.ensure_file_extension(self.data_url, file_extension) - self.__check_overwrite(fileurl) + check_overwrite(fileurl, self.existing_file) gdf.to_parquet(fileurl, compression=compression) - else: + elif self.dataformat == "GeoJSON": fileurl = knut.ensure_file_extension(self.data_url, ".geojson") - self.__check_overwrite(fileurl) - gdf.to_file(fileurl, driver="GeoJSON") + check_overwrite(fileurl, self.existing_file) + if self.encoding == _EncodingOptions.AUTO.name: + gdf.to_file(fileurl) + else: + gdf.to_file(fileurl, driver="GeoJSON", encoding=self.encoding) + else: + fileurl = knut.ensure_file_extension(self.data_url, ".gml") + check_overwrite(fileurl, self.existing_file) + if self.encoding == _EncodingOptions.AUTO.name: + gdf.to_file(fileurl) + else: + gdf.to_file(fileurl, driver="GML", encoding=self.encoding) return None - def __check_overwrite(self, fileurl): - if self.existing_file == ExistingFile.FAIL.name: - import os.path - - if os.path.exists(fileurl): - raise knext.InvalidParametersError( - "File already exists and should not be overwritten." - ) - ############################################ # GeoPackage Reader @@ -303,10 +400,11 @@ def __check_overwrite(self, fileurl): }, ) class GeoPackageReaderNode: - data_url = knext.StringParameter( + data_url = knext.LocalPathParameter( "Input file path", - "The file path for reading data.", - "", + "Select the file path for reading data.", + placeholder_text="Select input file path...", + validator=validate_path, ) data_layer = knext.StringParameter( @@ -315,6 +413,15 @@ class GeoPackageReaderNode: "", ) + encoding = knext.EnumParameter( + label="Encoding", + description="Select the encoding for reading the data file.", + default_value=_EncodingOptions.get_default().name, + enum=_EncodingOptions, + since_version="1.3.0", + is_advanced=True, + ) + def configure(self, configure_context): # TODO Create combined schema return None @@ -327,27 +434,33 @@ def execute(self, exec_context: knext.ExecutionContext): import pandas as pd layerlist = fiona.listlayers(self.data_url) - pnumber = pd.Series(range(0, 100)).astype(str).to_list() - if self.data_layer in layerlist: - src = fiona.open(self.data_url, layer=self.data_layer) - elif self.data_layer in pnumber: - nlayer = int(self.data_layer) - src = fiona.open(self.data_url, layer=nlayer) + layer = self._get_layer(layerlist) + + if self.encoding == _EncodingOptions.AUTO.name: + gdf = gp.read_file( + self.data_url, layer=layer, engine="pyogrio", on_invalid="ignore" + ) else: - src = fiona.open(self.data_url, layer=0) - gdf = gp.GeoDataFrame.from_features(src) - try: - gdf.crs = src.crs - except: - print("Invalid CRS") - gdf = gdf.reset_index(drop=True) - if "" in gdf.columns: - gdf = gdf.drop(columns="") - if "" in gdf.columns: - gdf = gdf.drop(columns="") + gdf = gp.read_file( + self.data_url, + layer=layer, + engine="pyogrio", + on_invalid="ignore", + encoding=self.encoding, + ) + + gdf = clean_dataframe(gdf) + listtable = pd.DataFrame({"layerlist": layerlist}) return knext.Table.from_pandas(gdf), knext.Table.from_pandas(listtable) + def _get_layer(self, layerlist): + if self.data_layer in layerlist: + return self.data_layer + elif self.data_layer.isdigit() and 0 <= int(self.data_layer) < 100: + return int(self.data_layer) + return 0 + ############################################ # GeoPackage Writer @@ -387,10 +500,11 @@ class GeoPackageWriterNode: include_none_column=False, ) - data_url = knext.StringParameter( + data_url = knext.LocalPathParameter( "Output file path", - "The file path for saving data.", - "", + "Select the file path for saving data.", + placeholder_text="Select output file path...", + validator=validate_path, ) data_layer = knext.StringParameter( @@ -399,6 +513,27 @@ class GeoPackageWriterNode: "new", ) + encoding = knext.EnumParameter( + label="Encoding", + description="Select the encoding for reading the data file.", + default_value=_EncodingOptions.get_default().name, + enum=_EncodingOptions, + since_version="1.3.0", + is_advanced=True, + ) + + existing_file = knext.EnumParameter( + "If exists:", + "Specify the behavior of the node in case the output file already exists.", + lambda v: ( + ExistingFile.OVERWRITE.name + if v < knext.Version(1, 3, 0) + else ExistingFile.FAIL.name + ), + enum=ExistingFile, + since_version="1.3.0", + ) + def configure(self, configure_context, input_schema): self.geo_col = knut.column_exists_or_preset( configure_context, self.geo_col, input_schema, knut.is_geo @@ -409,6 +544,13 @@ def execute(self, exec_context: knext.ExecutionContext, input_1): exec_context.set_progress( 0.4, "Writing file (This might take a while without progress changes)" ) + import os + + check_overwrite(self.data_url, self.existing_file) + output_dir = os.path.dirname(self.data_url) + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + gdf = gp.GeoDataFrame(input_1.to_pandas(), geometry=self.geo_col) gdf = gdf.reset_index(drop=True) file_name = knut.ensure_file_extension(self.data_url, ".gpkg") @@ -419,9 +561,14 @@ def execute(self, exec_context: knext.ExecutionContext, input_1): ).columns if len(time_columns) > 0: gdf[time_columns] = gdf[time_columns].astype(str) - if "" in gdf.columns: - gdf = gdf.drop(columns="") - if "" in gdf.columns: - gdf = gdf.drop(columns="") - gdf.to_file(file_name, layer=self.data_layer, driver="GPKG") + + gdf = clean_dataframe(gdf) + + if self.encoding == _EncodingOptions.AUTO.name: + gdf.to_file(file_name, layer=self.data_layer, driver="GPKG") + else: + gdf.to_file( + file_name, layer=self.data_layer, driver="GPKG", encoding=self.encoding + ) + return None