from .. import filesystem from ..asyn import AsyncFileSystem class DirFileSystem(AsyncFileSystem): """Directory prefix filesystem The DirFileSystem is a filesystem-wrapper. It assumes every path it is dealing with is relative to the `path`. After performing the necessary paths operation it delegates everything to the wrapped filesystem. """ protocol = "dir" def __init__( self, path=None, fs=None, fo=None, target_protocol=None, target_options=None, **storage_options, ): """ Parameters ---------- path: str Path to the directory. fs: AbstractFileSystem An instantiated filesystem to wrap. target_protocol, target_options: if fs is none, construct it from these fo: str Alternate for path; do not provide both """ super().__init__(**storage_options) if fs is None: fs = filesystem(protocol=target_protocol, **(target_options or {})) if (path is not None) ^ (fo is not None) is False: raise ValueError("Provide path or fo, not both") path = path or fo if self.asynchronous and not fs.async_impl: raise ValueError("can't use asynchronous with non-async fs") if fs.async_impl and self.asynchronous != fs.asynchronous: raise ValueError("both dirfs and fs should be in the same sync/async mode") self.path = fs._strip_protocol(path) self.fs = fs def _join(self, path): if isinstance(path, str): if not self.path: return path if not path: return self.path return self.fs.sep.join((self.path, self._strip_protocol(path))) if isinstance(path, dict): return {self._join(_path): value for _path, value in path.items()} return [self._join(_path) for _path in path] def _relpath(self, path): if isinstance(path, str): if not self.path: return path # We need to account for S3FileSystem returning paths that do not # start with a '/' if path == self.path or ( self.path.startswith(self.fs.sep) and path == self.path[1:] ): return "" prefix = self.path + self.fs.sep if self.path.startswith(self.fs.sep) and not path.startswith(self.fs.sep): prefix = prefix[1:] assert path.startswith(prefix) return path[len(prefix) :] return [self._relpath(_path) for _path in path] # Wrappers below @property def sep(self): return self.fs.sep async def set_session(self, *args, **kwargs): return await self.fs.set_session(*args, **kwargs) async def _rm_file(self, path, **kwargs): return await self.fs._rm_file(self._join(path), **kwargs) def rm_file(self, path, **kwargs): return self.fs.rm_file(self._join(path), **kwargs) async def _rm(self, path, *args, **kwargs): return await self.fs._rm(self._join(path), *args, **kwargs) def rm(self, path, *args, **kwargs): return self.fs.rm(self._join(path), *args, **kwargs) async def _cp_file(self, path1, path2, **kwargs): return await self.fs._cp_file(self._join(path1), self._join(path2), **kwargs) def cp_file(self, path1, path2, **kwargs): return self.fs.cp_file(self._join(path1), self._join(path2), **kwargs) async def _copy( self, path1, path2, *args, **kwargs, ): return await self.fs._copy( self._join(path1), self._join(path2), *args, **kwargs, ) def copy(self, path1, path2, *args, **kwargs): return self.fs.copy( self._join(path1), self._join(path2), *args, **kwargs, ) async def _pipe(self, path, *args, **kwargs): return await self.fs._pipe(self._join(path), *args, **kwargs) def pipe(self, path, *args, **kwargs): return self.fs.pipe(self._join(path), *args, **kwargs) async def _pipe_file(self, path, *args, **kwargs): return await self.fs._pipe_file(self._join(path), *args, **kwargs) def pipe_file(self, path, *args, **kwargs): return self.fs.pipe_file(self._join(path), *args, **kwargs) async def _cat_file(self, path, *args, **kwargs): return await self.fs._cat_file(self._join(path), *args, **kwargs) def cat_file(self, path, *args, **kwargs): return self.fs.cat_file(self._join(path), *args, **kwargs) async def _cat(self, path, *args, **kwargs): ret = await self.fs._cat( self._join(path), *args, **kwargs, ) if isinstance(ret, dict): return {self._relpath(key): value for key, value in ret.items()} return ret def cat(self, path, *args, **kwargs): ret = self.fs.cat( self._join(path), *args, **kwargs, ) if isinstance(ret, dict): return {self._relpath(key): value for key, value in ret.items()} return ret async def _put_file(self, lpath, rpath, **kwargs): return await self.fs._put_file(lpath, self._join(rpath), **kwargs) def put_file(self, lpath, rpath, **kwargs): return self.fs.put_file(lpath, self._join(rpath), **kwargs) async def _put( self, lpath, rpath, *args, **kwargs, ): return await self.fs._put( lpath, self._join(rpath), *args, **kwargs, ) def put(self, lpath, rpath, *args, **kwargs): return self.fs.put( lpath, self._join(rpath), *args, **kwargs, ) async def _get_file(self, rpath, lpath, **kwargs): return await self.fs._get_file(self._join(rpath), lpath, **kwargs) def get_file(self, rpath, lpath, **kwargs): return self.fs.get_file(self._join(rpath), lpath, **kwargs) async def _get(self, rpath, *args, **kwargs): return await self.fs._get(self._join(rpath), *args, **kwargs) def get(self, rpath, *args, **kwargs): return self.fs.get(self._join(rpath), *args, **kwargs) async def _isfile(self, path): return await self.fs._isfile(self._join(path)) def isfile(self, path): return self.fs.isfile(self._join(path)) async def _isdir(self, path): return await self.fs._isdir(self._join(path)) def isdir(self, path): return self.fs.isdir(self._join(path)) async def _size(self, path): return await self.fs._size(self._join(path)) def size(self, path): return self.fs.size(self._join(path)) async def _exists(self, path): return await self.fs._exists(self._join(path)) def exists(self, path): return self.fs.exists(self._join(path)) async def _info(self, path, **kwargs): return await self.fs._info(self._join(path), **kwargs) def info(self, path, **kwargs): return self.fs.info(self._join(path), **kwargs) async def _ls(self, path, detail=True, **kwargs): ret = (await self.fs._ls(self._join(path), detail=detail, **kwargs)).copy() if detail: out = [] for entry in ret: entry = entry.copy() entry["name"] = self._relpath(entry["name"]) out.append(entry) return out return self._relpath(ret) def ls(self, path, detail=True, **kwargs): ret = self.fs.ls(self._join(path), detail=detail, **kwargs).copy() if detail: out = [] for entry in ret: entry = entry.copy() entry["name"] = self._relpath(entry["name"]) out.append(entry) return out return self._relpath(ret) async def _walk(self, path, *args, **kwargs): async for root, dirs, files in self.fs._walk(self._join(path), *args, **kwargs): yield self._relpath(root), dirs, files def walk(self, path, *args, **kwargs): for root, dirs, files in self.fs.walk(self._join(path), *args, **kwargs): yield self._relpath(root), dirs, files async def _glob(self, path, **kwargs): detail = kwargs.get("detail", False) ret = await self.fs._glob(self._join(path), **kwargs) if detail: return {self._relpath(path): info for path, info in ret.items()} return self._relpath(ret) def glob(self, path, **kwargs): detail = kwargs.get("detail", False) ret = self.fs.glob(self._join(path), **kwargs) if detail: return {self._relpath(path): info for path, info in ret.items()} return self._relpath(ret) async def _du(self, path, *args, **kwargs): total = kwargs.get("total", True) ret = await self.fs._du(self._join(path), *args, **kwargs) if total: return ret return {self._relpath(path): size for path, size in ret.items()} def du(self, path, *args, **kwargs): total = kwargs.get("total", True) ret = self.fs.du(self._join(path), *args, **kwargs) if total: return ret return {self._relpath(path): size for path, size in ret.items()} async def _find(self, path, *args, **kwargs): detail = kwargs.get("detail", False) ret = await self.fs._find(self._join(path), *args, **kwargs) if detail: return {self._relpath(path): info for path, info in ret.items()} return self._relpath(ret) def find(self, path, *args, **kwargs): detail = kwargs.get("detail", False) ret = self.fs.find(self._join(path), *args, **kwargs) if detail: return {self._relpath(path): info for path, info in ret.items()} return self._relpath(ret) async def _expand_path(self, path, *args, **kwargs): return self._relpath( await self.fs._expand_path(self._join(path), *args, **kwargs) ) def expand_path(self, path, *args, **kwargs): return self._relpath(self.fs.expand_path(self._join(path), *args, **kwargs)) async def _mkdir(self, path, *args, **kwargs): return await self.fs._mkdir(self._join(path), *args, **kwargs) def mkdir(self, path, *args, **kwargs): return self.fs.mkdir(self._join(path), *args, **kwargs) async def _makedirs(self, path, *args, **kwargs): return await self.fs._makedirs(self._join(path), *args, **kwargs) def makedirs(self, path, *args, **kwargs): return self.fs.makedirs(self._join(path), *args, **kwargs) def rmdir(self, path): return self.fs.rmdir(self._join(path)) def mv(self, path1, path2, **kwargs): return self.fs.mv( self._join(path1), self._join(path2), **kwargs, ) def touch(self, path, **kwargs): return self.fs.touch(self._join(path), **kwargs) def created(self, path): return self.fs.created(self._join(path)) def modified(self, path): return self.fs.modified(self._join(path)) def sign(self, path, *args, **kwargs): return self.fs.sign(self._join(path), *args, **kwargs) def __repr__(self): return f"{self.__class__.__qualname__}(path='{self.path}', fs={self.fs})" def open( self, path, *args, **kwargs, ): return self.fs.open( self._join(path), *args, **kwargs, )