Skip to content

Commit

Permalink
Allow using default fs for fsspec and pass storage options to client
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Nov 7, 2024
1 parent 0db3e77 commit cb99fa9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
20 changes: 12 additions & 8 deletions python/hdfs_native/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
class HdfsFileSystem(AbstractFileSystem):
root_marker = "/"

def __init__(self, host: str, port: Optional[int] = None, *args, **storage_options):
def __init__(self, host: Optional[str] = None, port: Optional[int] = None, *args, **storage_options):
super().__init__(host, port, *args, **storage_options)
self.host = host
self.port = port
url = f"{self.protocol}://{host}"
if port:
url += f":{port}"
self.client = Client(url)
url = f"{self.protocol}://"
if host:
url += host
if port:
url += f":{port}"
self.client = Client(url, storage_options)

@property
def fsid(self):
Expand All @@ -40,9 +42,11 @@ def _strip_protocol(cls, path: str) -> str:
def unstrip_protocol(self, name: str) -> str:
path = self._strip_protocol(name)

url = f"{self.protocol}://{self.host}"
if self.port:
url += f":{self.port}"
url = f"{self.protocol}://"
if self.host:
url += self.host
if self.port:
url += f":{self.port}"

return f"{url}{path}"

Expand Down
5 changes: 5 additions & 0 deletions python/tests/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from hdfs_native.fsspec import HdfsFileSystem


def test_config(minidfs: str):
url = urllib.parse.urlparse(minidfs)
fs: HdfsFileSystem = fsspec.filesystem(url.scheme, **{'fs.defaultFS': minidfs})
assert len(fs.ls("/")) == 0

def test_dirs(fs: HdfsFileSystem):
fs.mkdir("/testdir")
assert fs.info("/testdir")["type"] == "directory"
Expand Down
18 changes: 12 additions & 6 deletions rust/src/hdfs/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,18 @@ impl NameServiceProxy {
todo!()
};

Ok(NameServiceProxy {
proxy_connections,
current_active: AtomicUsize::new(0),
current_observers: Arc::new(Mutex::new(HashSet::new())),
msycned: AtomicBool::new(false),
})
if proxy_connections.is_empty() {
Err(HdfsError::InvalidArgument(
"No NameNode hosts found".to_string(),
))
} else {
Ok(NameServiceProxy {
proxy_connections,
current_active: AtomicUsize::new(0),
current_observers: Arc::new(Mutex::new(HashSet::new())),
msycned: AtomicBool::new(false),
})
}
}

async fn msync_if_needed(&self, write: bool) -> Result<()> {
Expand Down

0 comments on commit cb99fa9

Please # to comment.