Skip to content

Concurrent hsload #257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions h5pyd/_apps/hsload.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ def main():
help="use the given compression algorithm for -z option (lz4 is default)")
cfg.setitem("ignorefilters", False, flags=["--ignore-filters"], help="ignore any filters used by source dataset")
cfg.setitem("retries", 3, flags=["--retries",], choices=["N",], help="Set number of server retry attempts")
cfg.setitem("no_checks", False, flags=["--no-checks"], help="do not check for existence before creating resources")
cfg.setitem("thread_count", 30, flags=["--thread-count"], choices=["N",] ,help="The number of threads to allocate when making requests in parallel, defaults to 30")
cfg.setitem("help", False, flags=["-h", "--help"], help="this message")

try:
Expand Down Expand Up @@ -258,6 +260,7 @@ def main():
"bucket": cfg["hs_bucket"],
"mode": mode,
"retries": int(cfg["retries"]),
"thread_count": int(cfg["thread_count"]),
}

fout = h5pyd.File(tgt, **kwargs)
Expand Down Expand Up @@ -323,6 +326,8 @@ def main():
"extend_offset": cfg["extend_offset"],
"ignore_error": cfg["ignore_error"],
"no_clobber": no_clobber,
"no_checks": cfg["no_checks"],
"thread_count": int(cfg["thread_count"]),
}
load_file(fin, fout, **kwargs)

Expand Down
72 changes: 48 additions & 24 deletions h5pyd/_apps/utillib.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import sys
import logging
import concurrent.futures

try:
import h5py
Expand Down Expand Up @@ -67,7 +68,7 @@ def dump_dtype(dt):

def is_h5py(obj):
# Return True if objref is a h5py object and False is not
if isinstance(obj, object) and isinstance(obj.id.id, int):
if isinstance(obj, object) and isinstance(obj.id.id, int): # type: ignore
return True
else:
return False
Expand Down Expand Up @@ -233,7 +234,7 @@ def convert_dtype(srcdt, ctx):
tgt_dt = h5pyd.special_dtype(vlen=str)
else:
tgt_dt = srcdt
return tgt_dt
return tgt_dt # type: ignore


def guess_chunk(shape, typesize):
Expand Down Expand Up @@ -615,7 +616,7 @@ def get_chunktable_dims(dset):
table_dims = []
for dim in range(rank):
dset_extent = dset.shape[dim]
chunk_extent = chunk_dims[dim]
chunk_extent = chunk_dims[dim] # type: ignore

if dset_extent > 0 and chunk_extent > 0:
table_extent = -(dset_extent // -chunk_extent)
Expand Down Expand Up @@ -838,7 +839,7 @@ def create_chunktable(dset, dset_dims, ctx):
chunktable_maxshape = [None,] if extend else []
chunktable_maxshape.extend(get_chunktable_dims(dset))
chunk_dims = [1,] if extend else []
chunk_dims.extend(get_chunk_dims(dset))
chunk_dims.extend(get_chunk_dims(dset)) # type: ignore

fout = ctx["fout"]
kwargs = {}
Expand Down Expand Up @@ -916,7 +917,7 @@ def create_chunktable(dset, dset_dims, ctx):

chunk_key = ""
for dim in range(rank):
chunk_key += str(index[dim] // chunk_dims[dim])
chunk_key += str(index[dim] // chunk_dims[dim]) # type: ignore
if dim < rank - 1:
chunk_key += "_"
logging.debug(f"adding chunk_key: {chunk_key}")
Expand Down Expand Up @@ -1024,15 +1025,15 @@ def update_chunktable(src, tgt, ctx):
chunktable = fout[f"datasets/{chunktable_id}"]
chunk_arr = get_chunk_locations(src, ctx, include_file_uri=extend)

msg = f"dataset chunk dimensions {chunktable.shape} not compatible with {chunk_arr.shape}"
if len(chunktable.shape) == len(chunk_arr.shape):
if chunktable.shape != chunk_arr.shape:
msg = f"dataset chunk dimensions {chunktable.shape} not compatible with {chunk_arr.shape}" # type: ignore
if len(chunktable.shape) == len(chunk_arr.shape): # type: ignore
if chunktable.shape != chunk_arr.shape: # type: ignore
logging.error(msg)
if not ctx["ignore_error"]:
raise IOError(msg)
return
elif len(chunk_arr.shape) + 1 == len(chunktable.shape):
if chunk_arr.shape != chunktable.shape[1:]:
elif len(chunk_arr.shape) + 1 == len(chunktable.shape): # type: ignore
if chunk_arr.shape != chunktable.shape[1:]: # type: ignore
logging.error(msg)
return
else:
Expand Down Expand Up @@ -1069,7 +1070,7 @@ def update_chunktable(src, tgt, ctx):
for i in range(len(chunk_indices)):
index.append(int(chunk_indices[i]))
index = tuple(index)
chunk_arr[index] = v
chunk_arr[index] = v # type: ignore
elif src_layout_class == "H5D_CHUNKED_REF_INDIRECT":
file_uri = src_layout["file_uri"]
orig_chunktable_id = src_layout["chunk_table"]
Expand All @@ -1088,7 +1089,7 @@ def update_chunktable(src, tgt, ctx):
tgt_index = [0,]
tgt_index.extend(it.multi_index)
tgt_index = tuple(tgt_index)
chunk_arr[it.multi_index] = e
chunk_arr[it.multi_index] = e # type: ignore
else:
msg = f"expected chunk ref class but got: {src_layout_class}"
logging.error(msg)
Expand Down Expand Up @@ -1169,7 +1170,7 @@ def create_dataset(dobj, ctx):
print(msg)
fout = ctx["fout"]

if dobj.name in fout:
if not ctx["no_checks"] and dobj.name in fout:
dset = fout[dobj.name]
logging.debug(f"{dobj.name} already exists")
if ctx["no_clobber"]:
Expand Down Expand Up @@ -1244,7 +1245,7 @@ def create_dataset(dobj, ctx):
rank = 0
else:
tgt_shape.extend(dobj.shape)
tgt_maxshape.extend(dobj.maxshape)
tgt_maxshape.extend(dobj.maxshape) # type: ignore
rank = len(tgt_shape)
if rank > 0 and ctx["extend_dim"]:
# set maxshape to unlimited for any dimension that is the extend_dim
Expand All @@ -1259,8 +1260,8 @@ def create_dataset(dobj, ctx):
if ctx["verbose"]:
print(msg)
for i in range(rank):
tgt_shape[i] = 0
tgt_maxshape[i] = None
tgt_shape[i] = 0 # type: ignore
tgt_maxshape[i] = None # type: ignore
else:
# check to see if any dimension scale refers to the extend dim
for dim in range(len(dobj.dims)):
Expand All @@ -1272,8 +1273,8 @@ def create_dataset(dobj, ctx):
msg = f"dimscale for dim: {dim}: {dimscale}, type: {type(dimscale)}"
logging.debug(msg)
if dimscale.name.split("/")[-1] == ctx["extend_dim"]:
tgt_shape[dim] = 0
tgt_maxshape[dim] = None
tgt_shape[dim] = 0 # type: ignore
tgt_maxshape[dim] = None # type: ignore
msg = f"setting dimension {dim} of dataset {dobj.name} to unlimited"
logging.info(msg)
if ctx["verbose"]:
Expand Down Expand Up @@ -1596,7 +1597,7 @@ def write_dataset(src, tgt, ctx):
if ctx["verbose"]:
print(msg)
except (IOError, TypeError) as e:
msg = f"ERROR : failed to copy dataset data {src_s}: {e}"
msg = f"ERROR : failed to copy dataset data {src_s}: {e}" # type: ignore
logging.error(msg)
if not ctx["ignore_error"]:
raise
Expand Down Expand Up @@ -1687,7 +1688,7 @@ def create_group(gobj, ctx):

grp = None

if gobj.name in fout:
if not ctx["no_checks"] and gobj.name in fout:
grp = fout[gobj.name]
logging.debug(f"{gobj.name} already exists")
if ctx["no_clobber"]:
Expand All @@ -1708,7 +1709,7 @@ def create_group(gobj, ctx):
if not ctx["ignore_error"]:
raise IOError(msg)
else:
if ctx["verbose"]:
if not ctx["no_checks"] and ctx["verbose"]:
print(f"{gobj.name} not found")

grp = fout.create_group(gobj.name)
Expand Down Expand Up @@ -1799,6 +1800,8 @@ def load_file(
extend_dim=None,
extend_offset=0,
ignore_error=False,
no_checks=False,
thread_count=30,
):

logging.info(f"input file: {fin.filename}")
Expand Down Expand Up @@ -1834,6 +1837,8 @@ def load_file(
ctx["extend_offset"] = extend_offset
ctx["srcid_desobj_map"] = {}
ctx["ignore_error"] = ignore_error
ctx["no_checks"] = no_checks
ctx["thread_count"] = thread_count

def copy_attribute_helper(name, obj):
logging.info(f"copy attribute - name: {name} obj: {obj.name}")
Expand Down Expand Up @@ -1875,6 +1880,24 @@ def object_copy_helper(name, obj):
else:
logging.error(f"no handler for object class: {type(obj)}")

def _visit_in_parallell(func):
logging.info("in parallell...")
jobs = []

def _add_to_jobs(name, obj):
jobs.append((name, obj))

with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor:
fin.visititems(_add_to_jobs)
futures = [executor.submit(func, item[0], item[1]) for item in jobs]

for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logging.exception(e)
raise

# build a rough map of the file using the internal function above
# copy over any attributes
# create soft/external links (and hardlinks not already created)
Expand All @@ -1886,16 +1909,17 @@ def object_copy_helper(name, obj):

# copy over any attributes
logging.info("creating target attributes")
fin.visititems(copy_attribute_helper)
_visit_in_parallell(copy_attribute_helper)

# create soft/external links (and hardlinks not already created)
create_links(fin, fout, ctx) # create root soft/external links
fin.visititems(object_link_helper)

_visit_in_parallell(object_link_helper)

if dataload == "ingest" or dataload == "link":
# copy dataset data
logging.info("copying dataset data")
fin.visititems(object_copy_helper)
_visit_in_parallell(object_copy_helper)
else:
logging.info("skipping dataset data copy (dataload is None)")

Expand Down
2 changes: 2 additions & 0 deletions h5pyd/_hl/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def __init__(
track_order=None,
retries=10,
timeout=180,
thread_count=30,
**kwds,
):
"""Create a new file object.
Expand Down Expand Up @@ -419,6 +420,7 @@ def __init__(
logger=logger,
retries=retries,
timeout=timeout,
thread_count=thread_count,
)

root_json = None
Expand Down
2 changes: 2 additions & 0 deletions h5pyd/_hl/folders.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(
owner=None,
batch_size=1000,
retries=3,
thread_count=30,
**kwds,
):
"""Create a new Folders object.
Expand Down Expand Up @@ -179,6 +180,7 @@ def __init__(
mode=mode,
logger=logger,
retries=retries,
thread_count=thread_count
)
self.log = self._http_conn.logging

Expand Down
11 changes: 8 additions & 3 deletions h5pyd/_hl/httpconn.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def __init__(
logger=None,
retries=3,
timeout=DEFAULT_TIMEOUT,
thread_count=30,
**kwds,
):
self._domain = domain_name
Expand All @@ -179,6 +180,8 @@ def __init__(
self._api_key = api_key
self._s = None # Sessions
self._server_info = None
self._thread_count = thread_count

if use_cache:
self._cache = {}
self._objdb = {}
Expand Down Expand Up @@ -441,10 +444,12 @@ def GET(self, req, format="json", params=None, headers=None, use_cache=True):

if check_cache:
self.log.debug("httpcon - checking cache")
if req in self._cache:
try:
self.log.debug("httpcon - returning cache result")
rsp = self._cache[req]
return rsp
except KeyError:
pass

self.log.info(f"GET: {self._endpoint + req} [{params['domain']}] timeout: {self._timeout}")

Expand Down Expand Up @@ -737,11 +742,11 @@ def session(self):

s.mount(
"http://",
HTTPAdapter(max_retries=retry, pool_connections=16, pool_maxsize=16),
HTTPAdapter(max_retries=retry, pool_connections=self._thread_count, pool_maxsize=self._thread_count),
)
s.mount(
"https://",
HTTPAdapter(max_retries=retry, pool_connections=16, pool_maxsize=16),
HTTPAdapter(max_retries=retry, pool_connections=self._thread_count, pool_maxsize=self._thread_count),
)
self._s = s
else:
Expand Down