Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ The code is designed based on the following assumptions:
If we have input slices of shape ``(nx, ny)``, and an output chunk shape of ``(nc, nc, nc)`` it makes sense to split the conversion into individual 'slabs' that have shape ``(nx, ny, nc)``.
This means there is a one-to-one mapping from slices to slabs, and slabs to chunks, allowing each slab to be processed in parallel without interfering with the other slabs.

Sharding
--------
``stack-to-chunk`` automatically introduces a sharding codec, where the shards each have shape ``(nx, ny, nc)``.
It is strong recommended you don't include sharding in the array specification used to setup the original array, otherwise you will end up with nested shards.

Third-party multi-threading
---------------------------
``stack-to-chunk`` turns off third-party multi-threading in ``blosc`` when running.
Expand Down
32 changes: 14 additions & 18 deletions src/stack_to_chunk/_array_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ def _copy_slab(arr_zarr: zarr.Array, slab: da.Array, zstart: int, zend: int) ->


@delayed # type: ignore[misc]
def _downsample_block(
arr_in: zarr.Array, arr_out: zarr.Array, block_idx: tuple[int, int, int]
) -> None:
def _downsample_slab(arr_in: zarr.Array, arr_out: zarr.Array, block_idx_z: int) -> None:
"""
Copy a single block from one array to the next, downsampling by a factor of two.
Copy a single slab from one array to the next, downsampling by a factor of two.

Data is copied from a block starting at `block_idx` and ending at
Data is copied from a slab starting at `block_idx` and ending at
`block_idx + 2 * arr_in.chunks`, ie a cube of (2, 2, 2) chunks.
Data is downsampled using local mean, and writen to a single chunk in `arr_out`.

Expand All @@ -54,28 +52,26 @@ def _downsample_block(
Index of block to copy. Must be a multiple of swice the chunk size of `arr_in`.

"""
chunk_size = arr_in.chunks[0] * 2
np.testing.assert_equal(
np.array(block_idx) % chunk_size,
np.array([0, 0, 0]),
err_msg=f"Block index {block_idx} not aligned with chunks {chunk_size}",
)
chunk_size = arr_in.shards[2] * 2
if block_idx_z % chunk_size != 0:
msg = f"Block index {block_idx_z} not aligned with chunks {chunk_size}"
raise ValueError(msg)

data = arr_in[
block_idx[0] : block_idx[0] + chunk_size,
block_idx[1] : block_idx[1] + chunk_size,
block_idx[2] : block_idx[2] + chunk_size,
:,
:,
block_idx_z : block_idx_z + chunk_size,
]
# Pad to an even number
pads = np.array(data.shape) % 2
pad_width = [(0, p) for p in pads]
data = np.pad(data, pad_width, mode="edge")
data = skimage.measure.block_reduce(data, block_size=2, func=np.mean)

block_idx_out = np.array(block_idx) // 2
block_idx_out = block_idx_z // 2
chunk_size_out = chunk_size // 2
arr_out[
block_idx_out[0] : block_idx_out[0] + chunk_size_out,
block_idx_out[1] : block_idx_out[1] + chunk_size_out,
block_idx_out[2] : block_idx_out[2] + chunk_size_out,
:,
:,
block_idx_out : block_idx_out + chunk_size_out,
] = data
98 changes: 79 additions & 19 deletions src/stack_to_chunk/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ome_zarr_models.v05.axes import Axis
from pydantic_zarr.v3 import ArraySpec

from stack_to_chunk._array_helpers import _copy_slab, _downsample_block
from stack_to_chunk._array_helpers import _copy_slab, _downsample_slab
from stack_to_chunk.ome_ngff import SPATIAL_UNIT, DatasetDict

DEFAULT_DIMENSION_NAMES = ("x", "y", "z")
Expand Down Expand Up @@ -53,6 +53,12 @@ class MultiScaleGroup:
does not need to be provided. Must not have dimension names set
(they are set automatically by stack-to-chunk).

Warnings
--------
stack-to-chunk adds sharding to the output, so it's recommended that the
array specification passed here does not already include a sharding codec
(if it does, there will be nested shards!)

"""

def __init__(
Expand All @@ -64,13 +70,14 @@ def __init__(
spatial_unit: SPATIAL_UNIT,
array_spec: ArraySpec | None = None,
) -> None:
path = Path(path).resolve()
self._store = zarr.storage.LocalStore(path)
self._path = path
self._name = name
self._spatial_unit = spatial_unit
self._voxel_size = self._validate_voxel_size(voxel_size)

if isinstance(path, Path) and not path.exists():
if not path.exists():
if array_spec is None:
msg = "Group does not already exist, array_spec must be provided"
raise ValueError(msg)
Expand Down Expand Up @@ -108,6 +115,50 @@ def _validate_dimension_names(cls, array_spec: ArraySpec) -> tuple[str, str, str

return dimension_names # type: ignore[no-any-return]

@classmethod
def _add_sharding_codec(cls, array_spec: ArraySpec) -> ArraySpec:
"""
Add a sharding codec to an array spec.

The shard size is set to span the first two dimensions, but only
to be as 'thick' as the chunk size in the third dimension.
"""
shape = array_spec.shape
chunk_grid = array_spec.chunk_grid.copy()
old_chunk_shape = chunk_grid["configuration"]["chunk_shape"]
shard_shape = (
# Because the chunk shape needs to divide the shard shape exactly,
# round up shard shape to nearest multiple
old_chunk_shape[0] * math.ceil(shape[0] / old_chunk_shape[0]),
old_chunk_shape[1] * math.ceil(shape[1] / old_chunk_shape[1]),
old_chunk_shape[2],
)

old_codecs = array_spec.codecs
new_codecs = [
{
"name": "sharding_indexed",
"configuration": {
"chunk_shape": old_chunk_shape,
"codecs": old_codecs,
"index_codecs": [
{
"name": "bytes",
"configuration": {
"endian": "little",
},
},
{"name": "crc32c"},
],
"index_location": "end",
},
}
]

chunk_grid["configuration"]["chunk_shape"] = shard_shape
logger.info(f"Adding sharding codec with shard shape {shard_shape}")
return array_spec.model_copy(update={"codecs": new_codecs})

def _create_zarr_group(self, array_spec: ArraySpec) -> None:
"""
Create the zarr group.
Expand All @@ -116,6 +167,7 @@ def _create_zarr_group(self, array_spec: ArraySpec) -> None:
"""
dimension_names = self._validate_dimension_names(array_spec)
array_spec = array_spec.model_copy(update={"dimension_names": dimension_names})
array_spec = self._add_sharding_codec(array_spec)

self._image: Image = Image.new(
array_specs=[array_spec],
Expand All @@ -142,7 +194,8 @@ def _create_zarr_group(self, array_spec: ArraySpec) -> None:
)
],
)
self._image.to_zarr(store=self._store, path="/")
print("hi")
self._image.to_zarr(store=self._store, path="")

@property
def _full_res_array(self) -> zarr.Array:
Expand Down Expand Up @@ -281,6 +334,7 @@ def add_downsample_level(self, level: int, *, n_processes: int) -> None:

"""
logger.info(f"Downsampling to level {level} with {n_processes=}")
# Validate level argument
if not (level >= 1 and int(level) == level):
msg = "level must be an integer >= 1"
raise ValueError(msg)
Expand All @@ -296,24 +350,30 @@ def add_downsample_level(self, level: int, *, n_processes: int) -> None:
msg,
)

source_arr: zarr.Array = self._group[level_minus_one]
new_shape = tuple(math.ceil(i / 2) for i in source_arr.shape)
chunk_size = source_arr.chunks[0]

sink_arr = self._group.create_array(
name=level_str,
shape=new_shape,
chunks=source_arr.chunks,
dtype=source_arr.dtype,
compressors=source_arr.compressors,
dimension_names=source_arr.metadata.dimension_names,
source_arr = self._group[level_minus_one]
source_spec = ArraySpec.from_zarr(source_arr)
# Update array shape
new_shape = tuple(math.ceil(i / 2) for i in source_spec.shape)
# Update first two dimensions of chunk shape to match
new_chunk_grid = source_spec.chunk_grid.copy()
old_chunk_shape = new_chunk_grid["configuration"]["chunk_shape"]
inner_chunk_shape: tuple[int, ...] = source_spec.codecs[0]["configuration"][
"chunk_shape"
]
new_chunk_grid["configuration"]["chunk_shape"] = (
inner_chunk_shape[0] * math.ceil(new_shape[0] / inner_chunk_shape[0]),
inner_chunk_shape[1] * math.ceil(new_shape[1] / inner_chunk_shape[1]),
old_chunk_shape[2],
)
new_chunk_shape = new_chunk_grid["configuration"]["chunk_shape"]

sink_spec = source_spec.model_copy(
update={"shape": new_shape, "chunk_grid": new_chunk_grid}
)
sink_arr = sink_spec.to_zarr(store=self._group.store_path, path=level_str)

block_indices = [
(x, y, z)
for x in range(0, source_arr.shape[0], chunk_size * 2)
for y in range(0, source_arr.shape[1], chunk_size * 2)
for z in range(0, source_arr.shape[2], chunk_size * 2)
z for z in range(0, source_spec.shape[2], new_chunk_shape[2] * 2)
]

all_args = [(source_arr, sink_arr, idxs) for idxs in block_indices]
Expand All @@ -322,7 +382,7 @@ def add_downsample_level(self, level: int, *, n_processes: int) -> None:
blosc_use_threads = blosc.use_threads
blosc.use_threads = 0

jobs = [_downsample_block(*args) for args in all_args]
jobs = [_downsample_slab(*args) for args in all_args]
logger.info(f"Launching {len(jobs)} jobs")
Parallel(n_jobs=n_processes, verbose=10)(jobs)

Expand Down
9 changes: 6 additions & 3 deletions src/stack_to_chunk/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,13 @@ def test_known_data(tmp_path: Path) -> None:


def test_padding(tmp_path: Path) -> None:
# Test data that doesn't fit exactly into (2, 2, 2) shaped chunks
"""
Test data that doesn't fit exactly into (2, 2, 2) shaped chunks.
"""
arr_npy = np.arange(8).reshape((2, 2, 2))
# Add an extra bit of data on the end
arr_npy = np.concatenate([arr_npy, [[[10, 10], [12, 16]]]], axis=0)
arr = da.from_array(arr_npy)
arr = arr.rechunk(chunks=(2, 2, 1))
arr = da.from_array(arr_npy, chunks=(2, 2, 1))

group = MultiScaleGroup(
tmp_path / "group.ome.zarr",
Expand All @@ -406,6 +408,7 @@ def test_padding(tmp_path: Path) -> None:
),
)
group.add_full_res_data(arr, n_processes=1)
np.testing.assert_equal(group[0][:], arr_npy)
group.add_downsample_level(1, n_processes=1)
arr_downsammpled = group[1]
np.testing.assert_equal(arr_downsammpled[:], [[[3]], [[12]]])
Expand Down
Loading