2
2
import io
3
3
import logging
4
4
import mimetypes
5
+ import re
5
6
import tarfile
6
7
import uuid
7
8
import zipfile
@@ -30,7 +31,9 @@ def extract_archive_files(mime_type, file_name, file_data):
30
31
continue
31
32
with archive .open (file_info ) as file :
32
33
file_mime_type = mimetypes .guess_type (file_info .filename )[0 ]
33
- data_uri = f"data:{ file_mime_type } ;name={ file_info .filename } ;base64,{ base64 .b64encode (file .read ()).decode ()} "
34
+ filename = file_info .filename
35
+ filename = "/" .join (filename .split ("/" )[1 :])
36
+ data_uri = f"data:{ file_mime_type } ;name={ filename } ;base64,{ base64 .b64encode (file .read ()).decode ()} "
34
37
extracted_files .append (data_uri )
35
38
elif mime_type in ["application/x-tar" , "application/gzip" , "application/x-bzip2" ]:
36
39
with tarfile .open (fileobj = io .BytesIO (file_data ), mode = "r:*" ) as archive :
@@ -65,6 +68,11 @@ class ArchiveFileSchema(BaseSource):
65
68
description = "Split the archive into individual files" ,
66
69
json_schema_extra = {"advanced_parameter" : True },
67
70
)
71
+ file_regex : str = Field (
72
+ default = None ,
73
+ description = "Regex to filter files" ,
74
+ json_schema_extra = {"advanced_parameter" : True },
75
+ )
68
76
69
77
@classmethod
70
78
def slug (cls ):
@@ -89,6 +97,8 @@ def get_data_documents(self, **kwargs) -> List[DataDocument]:
89
97
for file in files :
90
98
file_id = str (uuid .uuid4 ())
91
99
mime_type , file_name , file_data = validate_parse_data_uri (file )
100
+ if self .split_files and self .file_regex and not re .match (self .file_regex , file_name ):
101
+ continue
92
102
file_objref = create_source_document_asset (
93
103
file , datasource_uuid = kwargs ["datasource_uuid" ], document_id = file_id
94
104
)
@@ -103,6 +113,7 @@ def get_data_documents(self, **kwargs) -> List[DataDocument]:
103
113
"mime_type" : mime_type ,
104
114
"source" : file_name ,
105
115
"datasource_uuid" : kwargs ["datasource_uuid" ],
116
+ "file_regex" : self .file_regex ,
106
117
},
107
118
datasource_uuid = kwargs ["datasource_uuid" ],
108
119
extra_info = {"extra_data" : self .get_extra_data ()},
@@ -121,6 +132,8 @@ def process_document(cls, document: DataDocument) -> DataDocument:
121
132
text_content = ""
122
133
for extracted_file in extracted_files :
123
134
mime_type , file_name , extracted_file_data = validate_parse_data_uri (extracted_file )
135
+ if document .metadata .get ("file_regex" ) and not re .match (document .metadata ["file_regex" ], file_name ):
136
+ continue
124
137
text_content += f"File: { file_name } \n "
125
138
decoded_file_data = base64 .b64decode (extracted_file_data )
126
139
elements += extract_text_elements (
0 commit comments