Skip to content

Commit

Permalink
add the code in #51
Browse files Browse the repository at this point in the history
  • Loading branch information
YichuanSun committed Jul 31, 2024
1 parent 7430c39 commit 689cdcf
Showing 1 changed file with 74 additions and 14 deletions.
88 changes: 74 additions & 14 deletions alluxiofs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,40 +200,100 @@ def fallback_wrapper(self, *args, **kwargs):
# process path related arguments to remove alluxiofs protocol
# since both alluxio and ufs cannot process alluxiofs protocol
# Require s3://bucket/path or /bucket/path
bound_args = signature.bind(self, *args, **kwargs)
bound_args.apply_defaults()
# paths may be passed as positional argument or kwarg arguments
# process accordingly and keep paths as positional argument if passed as positional,
# and keep as kwarg arguments if passed as kwarg.

# fsspec path parameters has different names and sequences
# use this approach to try to process all path related parameters
for param in ["path", "path1", "path2", "lpath", "rpath"]:
if param in bound_args.arguments:
bound_args.arguments[
param
] = self._strip_alluxiofs_protocol(
bound_args.arguments[param]
# process all path related parameters
possible_path_arg_names = [("path1", "path2"), ("lpath", "rpath")]
positional_params = list(args)

# get a list of arguments defined in the function
argument_list = []
for param in signature.parameters.values():
argument_list.append(param.name)

if "path" in argument_list:
if "path" in kwargs:
kwargs["path"] = self._strip_alluxiofs_protocol(
kwargs["path"]
)
else:
positional_params[0] = self._strip_alluxiofs_protocol(
positional_params[0]
)

else:
for path1, path2 in possible_path_arg_names:
if (path1 in argument_list) and (path2 in argument_list):
if (path1 in kwargs) and (path2 in kwargs):
kwargs[path1] = self._strip_alluxiofs_protocol(
kwargs[path1]
)
kwargs[path2] = self._strip_alluxiofs_protocol(
kwargs[path2]
)
elif (path1 in kwargs) and (path2 not in kwargs):
kwargs[path1] = self._strip_alluxiofs_protocol(
kwargs[path1]
)
path2_index = argument_list.index(path2) - 1
positional_params[
path2_index
] = self._strip_alluxiofs_protocol(
positional_params[path2_index]
)
elif (path1 not in kwargs) and (path2 in kwargs):
kwargs[path2] = self._strip_alluxiofs_protocol(
kwargs[path2]
)
path1_index = argument_list.index(path1) - 1
positional_params[
path1_index
] = self._strip_alluxiofs_protocol(
positional_params[path1_index]
)
else:
path1_index = argument_list.index(path1) - 1
positional_params[
path1_index
] = self._strip_alluxiofs_protocol(
positional_params[path1_index]
)
path2_index = argument_list.index(path2) - 1
positional_params[
path2_index
] = self._strip_alluxiofs_protocol(
positional_params[path2_index]
)

positional_params = tuple(positional_params)

try:
if self.alluxio:
start_time = time.time()
res = alluxio_impl(*bound_args.args, **bound_args.kwargs)
res = alluxio_impl(self, *positional_params, **kwargs)
logger.debug(
f"Exit(Ok): alluxio op({alluxio_impl.__name__}) args({bound_args.args}) kwargs({bound_args.kwargs}) time({(time.time() - start_time):.2f}s)"
f"Exit(Ok): alluxio op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs}) time({(time.time() - start_time):.2f}s)"
)
return res
except Exception as e:
if not isinstance(e, NotImplementedError):
logger.debug(
f"Exit(Error): alluxio op({alluxio_impl.__name__}) args({bound_args.args}) kwargs({bound_args.kwargs}) {e}"
f"Exit(Error): alluxio op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs}) {e}"
)
self.error_metrics.record_error(alluxio_impl.__name__, e)
if self.fs is None:
raise e

fs_method = getattr(self.fs, alluxio_impl.__name__, None)

if fs_method:
res = fs_method(*bound_args.args[1:], **bound_args.kwargs)
res = fs_method(*positional_params, **kwargs)

logger.debug(
f"Exit(Ok): ufs({self.target_protocol}) op({alluxio_impl.__name__}) args({bound_args.args}) kwargs({bound_args.kwargs})"
f"Exit(Ok): ufs({self.target_protocol}) op({alluxio_impl.__name__}) args({positional_params}) kwargs({kwargs})"
)
return res
raise NotImplementedError(
Expand Down

0 comments on commit 689cdcf

Please sign in to comment.