Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 91 additions & 36 deletions src/rasterix/rasterize/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ def _get_engine(engine: Engine | None) -> Engine:
try:
import rusterize as _ # noqa: F401
except ImportError as e:
raise ImportError("rusterize is not installed. Install it with: pip install rusterize") from e
raise ImportError(
"rusterize is not installed. Install it with: pip install rusterize"
) from e
elif engine == "rasterio":
try:
import rasterio as _ # noqa: F401
except ImportError as e:
raise ImportError("rasterio is not installed. Install it with: pip install rasterio") from e
raise ImportError(
"rasterio is not installed. Install it with: pip install rasterio"
) from e
elif engine == "exactextract":
try:
import exactextract as _ # noqa: F401
Expand Down Expand Up @@ -138,6 +142,7 @@ def rasterize(
obj: xr.Dataset | xr.DataArray,
geometries: gpd.GeoDataFrame | dask_geopandas.GeoDataFrame,
*,
field: str | None = None,
engine: Engine | None = None,
xdim: str = "x",
ydim: str = "y",
Expand All @@ -158,6 +163,10 @@ def rasterize(
Xarray object whose grid to rasterize onto.
geometries : GeoDataFrame
Either a geopandas or dask_geopandas GeoDataFrame.
field : str or None
Name of a column in ``geometries`` whose values will be burned into the
raster. When ``None`` (default), sequential integer indices are used.
The column must contain numeric values.
engine : {"rasterio", "rusterize", "exactextract"} or None
Rasterization engine to use. If None, auto-detects based on installed
packages (prefers rusterize if available, falls back to rasterio).
Expand Down Expand Up @@ -215,7 +224,9 @@ def rasterize(
exactextract.exact_extract
"""
if xdim not in obj.dims or ydim not in obj.dims:
raise ValueError(f"Received {xdim=!r}, {ydim=!r} but obj.dims={tuple(obj.dims)}")
raise ValueError(
f"Received {xdim=!r}, {ydim=!r} but obj.dims={tuple(obj.dims)}"
)

resolved_engine = _get_engine(engine)

Expand All @@ -234,53 +245,95 @@ def rasterize(
**engine_kwargs,
)

if is_in_memory(obj=obj, geometries=geometries):
geom_array = geometries.to_numpy().squeeze(axis=1)
rasterized = rasterize_geometries(
geom_array.tolist(),
shape=(obj.sizes[ydim], obj.sizes[xdim]),
offset=0,
dtype=np.min_scalar_type(len(geometries)),
fill=len(geometries),
**rasterize_kwargs,
if field is not None and field not in geometries.columns:
raise ValueError(
f"Column {field!r} not found in geometries. Available columns: {list(geometries.columns)}"
)

if is_in_memory(obj=obj, geometries=geometries):
geom_array = geometries.geometry.to_numpy()
if field is not None:
field_values = geometries[field].to_numpy()
dtype = np.result_type(field_values)
rasterized = rasterize_geometries(
geom_array.tolist(),
shape=(obj.sizes[ydim], obj.sizes[xdim]),
offset=0,
dtype=dtype,
fill=dtype.type(0),
values=field_values.tolist(),
**rasterize_kwargs,
)
else:
rasterized = rasterize_geometries(
geom_array.tolist(),
shape=(obj.sizes[ydim], obj.sizes[xdim]),
offset=0,
dtype=np.min_scalar_type(len(geometries)),
fill=len(geometries),
**rasterize_kwargs,
)
else:
from dask.array import from_array, map_blocks

map_blocks_args, chunks, geom_array = prepare_for_dask(
map_blocks_args, chunks, geom_array, field_array = prepare_for_dask(
obj,
geometries,
xdim=xdim,
ydim=ydim,
geoms_rechunk_size=geoms_rechunk_size,
field=field,
)
# DaskGeoDataFrame.len() computes!
num_geoms = geom_array.size
# with dask, we use 0 as a fill value and replace it later
dtype = np.min_scalar_type(num_geoms)
# add 1 to the offset, to account for 0 as fill value
npoffsets = np.cumsum(np.array([0, *geom_array.chunks[0][:-1]])) + 1
offsets = from_array(npoffsets, chunks=1)

rasterized = map_blocks(
dask_rasterize_wrapper,
*map_blocks_args,
offsets[:, np.newaxis, np.newaxis],
chunks=((1,) * geom_array.numblocks[0], chunks[YAXIS], chunks[XAXIS]),
meta=np.array([], dtype=dtype),
fill=0, # good identity value for both sum & replace.
**rasterize_kwargs,
dtype_=dtype,
)

if field_array is not None:
dtype = field_array.dtype

rasterized = map_blocks(
dask_rasterize_wrapper,
*map_blocks_args,
# offset is unused when values are provided, but still required positionally
from_array(np.zeros(geom_array.numblocks[0], dtype=int), chunks=1)[
:, np.newaxis, np.newaxis
],
chunks=((1,) * geom_array.numblocks[0], chunks[YAXIS], chunks[XAXIS]),
meta=np.array([], dtype=dtype),
fill=dtype.type(0),
values=field_array[:, np.newaxis, np.newaxis],
**rasterize_kwargs,
dtype_=dtype,
)
else:
# DaskGeoDataFrame.len() computes!
num_geoms = geom_array.size
# with dask, we use 0 as a fill value and replace it later
dtype = np.min_scalar_type(num_geoms)
# add 1 to the offset, to account for 0 as fill value
npoffsets = np.cumsum(np.array([0, *geom_array.chunks[0][:-1]])) + 1
offsets = from_array(npoffsets, chunks=1)

rasterized = map_blocks(
dask_rasterize_wrapper,
*map_blocks_args,
offsets[:, np.newaxis, np.newaxis],
chunks=((1,) * geom_array.numblocks[0], chunks[YAXIS], chunks[XAXIS]),
meta=np.array([], dtype=dtype),
fill=0, # good identity value for both sum & replace.
**rasterize_kwargs,
dtype_=dtype,
)

if merge_alg == "replace":
rasterized = rasterized.max(axis=0)
elif merge_alg == "add":
rasterized = rasterized.sum(axis=0)
else:
raise ValueError(f"Invalid merge_alg {merge_alg!r}. Must be one of: ['replace', 'add']")
raise ValueError(
f"Invalid merge_alg {merge_alg!r}. Must be one of: ['replace', 'add']"
)

# and reduce every other value by 1
rasterized = rasterized.map_blocks(partial(replace_values, to=num_geoms))
if field is None:
# and reduce every other value by 1
rasterized = rasterized.map_blocks(partial(replace_values, to=num_geoms))

return xr.DataArray(
dims=(ydim, xdim),
Expand Down Expand Up @@ -360,7 +413,9 @@ def geometry_mask(
rasterio.features.geometry_mask
"""
if xdim not in obj.dims or ydim not in obj.dims:
raise ValueError(f"Received {xdim=!r}, {ydim=!r} but obj.dims={tuple(obj.dims)}")
raise ValueError(
f"Received {xdim=!r}, {ydim=!r} but obj.dims={tuple(obj.dims)}"
)

resolved_engine = _get_engine(engine)

Expand Down Expand Up @@ -388,7 +443,7 @@ def geometry_mask(
else:
from dask.array import map_blocks

map_blocks_args, chunks, geom_array = prepare_for_dask(
map_blocks_args, chunks, geom_array, _ = prepare_for_dask(
obj,
geometries,
xdim=xdim,
Expand Down
15 changes: 14 additions & 1 deletion src/rasterix/rasterize/rasterio.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,22 @@ def dask_rasterize_wrapper(
all_touched: bool,
merge_alg: MergeAlg,
dtype_: np.dtype,
values: np.ndarray | None = None,
env: rio.Env | None = None,
) -> np.ndarray:
offset = offset_array.item()

if values is not None:
chunk_values = values[:, 0, 0].tolist()
else:
chunk_values = None

return rasterize_geometries(
geom_array[:, 0, 0].tolist(),
affine=affine * affine.translation(x_offsets.item(), y_offsets.item()),
shape=(y_sizes.item(), x_sizes.item()),
offset=offset,
values=chunk_values,
all_touched=all_touched,
merge_alg=merge_alg,
fill=fill,
Expand All @@ -85,14 +92,20 @@ def rasterize_geometries(
shape: tuple[int, int],
affine: Affine,
offset: int,
values: Sequence[Any] | None = None,
env: rio.Env | None = None,
clear_cache: bool = False,
**kwargs,
):
from rasterio.features import rasterize as rasterize_rio

if values is not None:
shapes = zip(geometries, values, strict=True)
else:
shapes = zip(geometries, range(offset, offset + len(geometries)), strict=True)

res = rasterize_rio(
zip(geometries, range(offset, offset + len(geometries)), strict=True),
shapes,
out_shape=shape,
transform=affine,
**kwargs,
Expand Down
45 changes: 39 additions & 6 deletions src/rasterix/rasterize/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,23 @@ def clip_to_bbox(obj: xr.Dataset, geometries: gpd.GeoDataFrame) -> xr.Dataset: .
@overload
def clip_to_bbox(obj: xr.DataArray, geometries: gpd.GeoDataFrame) -> xr.DataArray: ...
def clip_to_bbox(
obj: xr.Dataset | xr.DataArray, geometries: gpd.GeoDataFrame, *, xdim: str, ydim: str
obj: xr.Dataset | xr.DataArray,
geometries: gpd.GeoDataFrame,
*,
xdim: str,
ydim: str,
) -> xr.Dataset | xr.DataArray:
bbox = geometries.total_bounds
if not hasattr(bbox, "chunks"):
y = obj[ydim].data
if y[0] < y[-1]:
obj = obj.sel({xdim: slice(bbox[0], bbox[2]), ydim: slice(bbox[1], bbox[3])})
obj = obj.sel(
{xdim: slice(bbox[0], bbox[2]), ydim: slice(bbox[1], bbox[3])}
)
else:
obj = obj.sel({xdim: slice(bbox[0], bbox[2]), ydim: slice(bbox[3], bbox[1])})
obj = obj.sel(
{xdim: slice(bbox[0], bbox[2]), ydim: slice(bbox[3], bbox[1])}
)
return obj


Expand Down Expand Up @@ -60,13 +68,28 @@ def geometries_as_dask_array(
return geometries.to_dask_array(lengths=chunks).squeeze(axis=1)


def _field_as_dask_array(
geometries: gpd.GeoDataFrame | dask_geopandas.GeoDataFrame,
field: str,
chunks: tuple[int, ...],
) -> dask.array.Array:
"""Extract a GeoDataFrame column as a dask array with the given chunks."""
from dask.array import from_array

if isinstance(geometries, gpd.GeoDataFrame):
return from_array(geometries[field].to_numpy(), chunks=chunks)
else:
return geometries[field].to_dask_array(lengths=chunks)


def prepare_for_dask(
obj: xr.Dataset | xr.DataArray,
geometries,
*,
xdim: str,
ydim: str,
geoms_rechunk_size: int | None,
field: str | None = None,
):
from dask.array import from_array

Expand All @@ -78,10 +101,20 @@ def prepare_for_dask(
if geoms_rechunk_size is not None:
geom_array = geom_array.rechunk({0: geoms_rechunk_size})

field_array = None
if field is not None:
field_array = _field_as_dask_array(geometries, field, geom_array.chunks[0])
if geoms_rechunk_size is not None:
field_array = field_array.rechunk({0: geoms_rechunk_size})

x_sizes = from_array(chunks[XAXIS], chunks=1)
y_sizes = from_array(chunks[YAXIS], chunks=1)
y_offsets = from_array(np.cumulative_sum(chunks[YAXIS][:-1], include_initial=True), chunks=1)
x_offsets = from_array(np.cumulative_sum(chunks[XAXIS][:-1], include_initial=True), chunks=1)
y_offsets = from_array(
np.cumulative_sum(chunks[YAXIS][:-1], include_initial=True), chunks=1
)
x_offsets = from_array(
np.cumulative_sum(chunks[XAXIS][:-1], include_initial=True), chunks=1
)

map_blocks_args = (
geom_array[:, np.newaxis, np.newaxis],
Expand All @@ -90,4 +123,4 @@ def prepare_for_dask(
x_sizes[np.newaxis, np.newaxis, :],
y_sizes[np.newaxis, :, np.newaxis],
)
return map_blocks_args, chunks, geom_array
return map_blocks_args, chunks, geom_array, field_array
Loading