Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Replace mounted file_path with actual OSS path #347

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion src/pai_rag/tools/data_process/ops/parser_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import threading
from typing import List
from loguru import logger
from pathlib import Path
from urllib.parse import urlparse
from pai_rag.core.rag_module import resolve
from pai_rag.utils.oss_client import OssClient
from pai_rag.tools.data_process.ops.base_op import BaseOP, OPERATORS
Expand Down Expand Up @@ -55,6 +57,23 @@ def __init__(
reader_config=self.data_reader_config,
oss_store=self.oss_store,
)
self.mount_path = os.environ.get("INPUT_MOUNT_PATH", None).strip("/")
self.real_path = os.environ.get("OSS_SOURCE_PATH", None).strip("/")
if self.mount_path and self.real_path:
self.mount_path = Path(self.mount_path).resolve()
real_uri = urlparse(self.real_path)
if not real_uri.scheme:
logger.error(
f"Real path '{self.real_path}' must include a URI scheme (e.g., 'oss://')."
)
self.should_replace = False
else:
self.should_replace = True
else:
self.should_replace = False
logger.warning(
"Environment variables for paths are not fully set. Path replacement will be skipped."
)
logger.info(
f"""ParserActor [PaiDataReader] init finished with following parameters:
concat_csv_rows: {concat_csv_rows}
Expand All @@ -63,14 +82,41 @@ def __init__(
sheet_column_filters: {sheet_column_filters}
oss_bucket: {oss_bucket}
oss_endpoint: {oss_endpoint}
path_should_replace: {self.should_replace}
"""
)

def replace_mount_with_real_path(self, document):
if not self.should_replace:
return document

try:
file_path = document.metadata["file_path"]
file_path_obj = Path(file_path).resolve()
relative_path_str = (
file_path_obj.relative_to(self.mount_path).as_posix().strip("/")
)
document.metadata["file_path"] = f"{self.real_path}/{relative_path_str}"
document.metadata["mount_path"] = file_path
logger.debug(
f"Replacing path: {file_path} --> {document.metadata['file_path']}"
)
return document
except ValueError:
# file_path 不以 mount_path 开头
logger.debug(
f"Path {document.metadata['file_path']} does not start with mount path {self.mount_path}. No replacement done."
)
return document
except Exception as e:
logger.error(f"Error replacing path {document.metadata['file_path']}: {e}")
return document

def process(self, input_file):
current_thread = threading.current_thread()
logger.info(f"当前线程的 ID: {current_thread.ident} 进程ID: {os.getpid()}")
documents = self.data_reader.load_data(file_path_or_directory=input_file)
if len(documents) == 0:
logger.info(f"No data found in the input file: {input_file}")
return None
return convert_document_to_dict(documents[0])
return convert_document_to_dict(self.replace_mount_with_real_path(documents[0]))
Loading