Skip to content

merge_tile_labels

blockbuster.merge_tile_labels(labeled: Union['da.Array', str, Path], write_to: Union[str, Path, None] = None, *, input_component: str = 'labels', output_component: str = 'labels', overlap: int = 0, sequential_labels: bool = False, n_workers: int | None = None, stage_dir: Union[str, Path, None] = None, keep_stage: bool = False, progress: bool = False) -> 'da.Array'

Merge per-tile labels into a globally consistent label array.

Standalone merge step — use this when you already have per-tile labels (from your own segmentation pipeline) and just need the boundary stitching.

Accepts either:

  • A dask array of per-tile integer labels (e.g. output of dask.array.map_blocks on your own segmentation function).
  • A zarr store path whose input_component array already contains per-tile labels written by your own pipeline.

Labels that touch across tile boundaries are merged into a single ID. The merge is zarr-native (boundary scan → scipy connected components → parallel relabel) — no dask task graph, scales to thousands of tiles.

Parameters:

Name Type Description Default
labeled Union['da.Array', str, Path]

Per-tile label array. Either a dask array or a path to a zarr store that contains per-tile labels in input_component.

required
write_to Union[str, Path, None]

Output zarr store path. When None, an auto-temp store is used.

None
input_component str

Array name inside a zarr input store (ignored for dask arrays).

'labels'
output_component str

Array name inside write_to. Default "labels".

'labels'
overlap int

If labeled is a dask array that was computed with da.overlap, pass the same depth here to trim the halos before merging. Set 0 (default) if the array has no overlap halos.

0
sequential_labels bool

Renumber the merged labels to a contiguous 1..N range via a cheap linear post-pass (O(voxels)). Default False.

False
n_workers int | None

Parallel workers for the relabel step. Default min(4, cpu_count).

None
stage_dir Union[str, Path, None]

Directory for the temp stage zarr when labeled is a dask array. Default: a system temp directory.

None
keep_stage bool

Keep the temp stage zarr after merging. Default False.

False
progress bool

Show a progress bar during the relabel step.

False

Returns:

Type Description
Array

Merged label array (int32) backed by write_to.

Examples:

From a dask array of per-tile labels:

>>> import dask.array as da
>>> from blockbuster import merge_tile_labels
>>>
>>> # your own tiling + segmentation
>>> image = da.from_zarr("image.zarr").rechunk((1, 1024, 1024))
>>> labeled = image.map_blocks(my_segment_fn, dtype="int32",
...                            meta=np.empty((0,) * image.ndim, dtype="int32"))
>>>
>>> merged = merge_tile_labels(labeled, write_to="labels.zarr", progress=True)

From a pre-staged zarr store (your pipeline already wrote labels):

>>> merged = merge_tile_labels(
...     "my_staged_labels.zarr",
...     input_component="raw_labels",
...     write_to="merged_labels.zarr",
...     sequential_labels=True,
... )

Trim overlap halos before merging:

>>> # if labeled was computed with da.overlap.overlap(depth=20)
>>> merged = merge_tile_labels(labeled, write_to="labels.zarr", overlap=20)
Source code in src/blockbuster/_merge.py
def merge_tile_labels(
    labeled: Union["da.Array", str, Path],
    write_to: Union[str, Path, None] = None,
    *,
    input_component: str = "labels",
    output_component: str = "labels",
    overlap: int = 0,
    sequential_labels: bool = False,
    n_workers: int | None = None,
    stage_dir: Union[str, Path, None] = None,
    keep_stage: bool = False,
    progress: bool = False,
) -> "da.Array":
    """Merge per-tile labels into a globally consistent label array.

    Standalone merge step — use this when you already have per-tile labels
    (from your own segmentation pipeline) and just need the boundary stitching.

    Accepts either:

    - A **dask array** of per-tile integer labels (e.g. output of
      ``dask.array.map_blocks`` on your own segmentation function).
    - A **zarr store path** whose ``input_component`` array already contains
      per-tile labels written by your own pipeline.

    Labels that **touch** across tile boundaries are merged into a single ID.
    The merge is zarr-native (boundary scan → scipy connected components →
    parallel relabel) — no dask task graph, scales to thousands of tiles.

    Parameters
    ----------
    labeled:
        Per-tile label array. Either a dask array or a path to a zarr store
        that contains per-tile labels in ``input_component``.
    write_to:
        Output zarr store path. When None, an auto-temp store is used.
    input_component:
        Array name inside a zarr *input* store (ignored for dask arrays).
    output_component:
        Array name inside ``write_to``. Default ``"labels"``.
    overlap:
        If ``labeled`` is a dask array that was computed with ``da.overlap``,
        pass the same depth here to trim the halos before merging.
        Set 0 (default) if the array has no overlap halos.
    sequential_labels:
        Renumber the merged labels to a contiguous ``1..N`` range via a cheap
        linear post-pass (O(voxels)). Default False.
    n_workers:
        Parallel workers for the relabel step. Default ``min(4, cpu_count)``.
    stage_dir:
        Directory for the temp stage zarr when *labeled* is a dask array.
        Default: a system temp directory.
    keep_stage:
        Keep the temp stage zarr after merging. Default False.
    progress:
        Show a progress bar during the relabel step.

    Returns
    -------
    da.Array
        Merged label array (int32) backed by ``write_to``.

    Examples
    --------
    **From a dask array of per-tile labels:**

    >>> import dask.array as da
    >>> from blockbuster import merge_tile_labels
    >>>
    >>> # your own tiling + segmentation
    >>> image = da.from_zarr("image.zarr").rechunk((1, 1024, 1024))
    >>> labeled = image.map_blocks(my_segment_fn, dtype="int32",
    ...                            meta=np.empty((0,) * image.ndim, dtype="int32"))
    >>>
    >>> merged = merge_tile_labels(labeled, write_to="labels.zarr", progress=True)

    **From a pre-staged zarr store (your pipeline already wrote labels):**

    >>> merged = merge_tile_labels(
    ...     "my_staged_labels.zarr",
    ...     input_component="raw_labels",
    ...     write_to="merged_labels.zarr",
    ...     sequential_labels=True,
    ... )

    **Trim overlap halos before merging:**

    >>> # if labeled was computed with da.overlap.overlap(depth=20)
    >>> merged = merge_tile_labels(labeled, write_to="labels.zarr", overlap=20)
    """
    import dask.array as da
    from ._relabel import relabel_sequential_zarr

    nw = n_workers if n_workers is not None else min(4, os.cpu_count() or 1)

    # -- Stage dask array to zarr if needed --
    stage_path: str | None = None
    staged_component = "staged"

    if isinstance(labeled, (str, Path)):
        stage_path = str(labeled)
        staged_component = input_component
    else:
        # labeled is a dask array
        if overlap > 0:
            labeled = da.overlap.trim_overlap(labeled, depth=overlap, boundary="none")

        _base = str(stage_dir) if stage_dir is not None else tempfile.mkdtemp(prefix="bb_stage_")
        stage_path = os.path.join(_base, "_bb_stage.zarr")

        import dask
        from dask.diagnostics import ProgressBar

        ctx = ProgressBar() if progress else _nullcontext()
        logger.info("Staging per-tile labels to %s …", stage_path)
        with ctx:
            dask.compute(
                labeled.to_zarr(stage_path, component=staged_component, overwrite=True, compute=False)
            )

    # -- Resolve output path --
    if write_to is not None:
        effective_out = str(write_to)
    else:
        effective_out = os.path.join(
            tempfile.mkdtemp(prefix="bb_merge_"), "merged.zarr"
        )
        logger.info("write_to not set — merged labels in auto-temp %s", effective_out)

    # -- Merge --
    zarr_native_merge(
        stage_path, staged_component,
        effective_out, output_component,
        n_workers=nw,
        show_progress=progress,
    )

    if sequential_labels:
        logger.info("Relabelling to contiguous ids…")
        relabel_sequential_zarr(effective_out, output_component)

    # -- Cleanup temp stage (only when we created it) --
    if not isinstance(labeled, (str, Path)) and not keep_stage:
        import shutil
        shutil.rmtree(stage_path, ignore_errors=True)
        logger.info("Removed stage store %s", stage_path)

    return da.from_zarr(effective_out, component=output_component)