Skip to content

Commit

Permalink
Refactor tifffile compression auto-guesses
Browse files Browse the repository at this point in the history
- fix predictor computations
- make things work with rasterio like params
- rechunk if needed
  • Loading branch information
Kirill888 committed Sep 22, 2023
1 parent 7e3fc91 commit dde6d2f
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 43 deletions.
172 changes: 134 additions & 38 deletions odc/geo/_cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@

AxisOrder = Union[Literal["YX"], Literal["YXS"], Literal["SYX"]]

# map compressor name to level name in GDAL
_GDAL_COMP: Dict[str, str] = {
"DEFLATE": "ZLEVEL",
"ADOBE_DEFLATE": "ZLEVEL",
"ZSTD": "ZSTD_LEVEL",
"WEBP": "WEBP_LEVEL",
"LERC": "MAX_Z_ERROR",
"LERC_DEFLATE": "MAX_Z_ERROR",
"LERC_ZSTD": "MAX_Z_ERROR",
"JPEG": "JPEG_QUALITY",
}


@dataclass
class CogMeta:
Expand Down Expand Up @@ -650,28 +662,15 @@ def _yaxis_from_shape(
raise ValueError("Geobox and image shape do not match")


def _norm_predictor(predictor: Union[int, bool], dtype: Any) -> int:
if isinstance(predictor, int):
return predictor

dtype = np.dtype(dtype)
if not predictor:
return 1
if dtype.kind == "f":
return 3
if dtype.kind in "ui" and dtype.itemsize <= 4:
return 2
return 1


def make_empty_cog(
def _make_empty_cog(
shape: Tuple[int, ...],
dtype: Any,
gbox: Optional[GeoBox] = None,
*,
nodata: MaybeNodata = None,
gdal_metadata: Optional[str] = None,
compression: Union[str, Unset] = Unset(),
compressionargs: Any = None,
predictor: Union[int, bool, Unset] = Unset(),
blocksize: Union[int, List[Union[int, Tuple[int, int]]]] = 2048,
bigtiff: bool = True,
Expand All @@ -684,23 +683,18 @@ def make_empty_cog(
FILETYPE,
PHOTOMETRIC,
PLANARCONFIG,
TIFF,
TiffWriter,
enumarg,
)

if isinstance(compression, Unset):
compression = str(kw.pop("compress", "ADOBE_DEFLATE"))
compression = compression.upper()
compression = {"DEFLATE": "ADOBE_DEFLATE"}.get(compression, compression)
_compression = int(COMPRESSION[compression])

if isinstance(predictor, Unset):
if _compression in TIFF.IMAGE_COMPRESSIONS:
predictor = 1
else:
predictor = _norm_predictor(True, dtype)
else:
predictor = _norm_predictor(predictor, dtype)
predictor, compression, compressionargs = _norm_compression_tifffile(
dtype,
predictor,
compression=compression,
compressionargs=compressionargs,
kw=kw,
)
_compression = enumarg(COMPRESSION, compression.upper())

if isinstance(blocksize, int):
blocksize = [blocksize]
Expand All @@ -727,6 +721,7 @@ def make_empty_cog(
"planarconfig": planarconfig,
"predictor": predictor,
"compression": _compression,
"compressionargs": compressionargs,
"software": False,
**kw,
}
Expand Down Expand Up @@ -757,7 +752,14 @@ def _sh(shape: Shape2d) -> Tuple[int, ...]:
for tsz, idx in zip(_blocks, range(nlevels + 1)):
tile = _norm_blocksize(tsz)
meta = CogMeta(
ax, im_shape, shape_(tile), nsamples, dtype, _compression, predictor, gbox
ax,
im_shape,
shape_(tile),
nsamples,
dtype,
int(_compression),
predictor,
gbox,
)

if idx == 0:
Expand Down Expand Up @@ -849,6 +851,7 @@ def compress_tiles(
meta: CogMeta,
scale_idx: int = 0,
sample_idx: int = 0,
compressionargs: Any = None,
**encoder_params,
):
"""
Expand All @@ -870,9 +873,15 @@ def compress_tiles(
assert meta.num_planes == 1
src_ydim = 0 # for now assume Y,X or Y,X,S

if compressionargs is not None:
encoder_params.update(**compressionargs)

encoder = mk_tile_compressor(meta, **encoder_params)
data = xx.data

if data.chunksize != meta.chunks:
data = data.rechunk(meta.chunks)

assert is_dask_collection(data)

tk = tokenize(
Expand Down Expand Up @@ -971,39 +980,122 @@ def _update_header(
return bytes(_bio.getbuffer())


def _norm_predictor(predictor: Union[int, bool, None], dtype: Any) -> int:
if predictor is False or predictor is None:
return 1

if predictor is True:
dtype = np.dtype(dtype)
if dtype.kind == "f":
return 3
if dtype.kind in "ui" and dtype.itemsize <= 4:
return 2
return 1
return predictor


def _norm_compression_tifffile(
dtype: Any,
predictor: Union[bool, None, int, Unset] = Unset(),
compression: Union[str, Unset] = Unset(),
compressionargs: Any = None,
level: Optional[Union[int, float]] = None,
kw: Optional[Dict[str, Any]] = None,
) -> Tuple[int, str, Dict[str, Any]]:
if kw is None:
kw = {}
if isinstance(compression, Unset):
compression = kw.pop("compress", "ADOBE_DEFLATE")
assert isinstance(compression, str)

if compressionargs is None:
compressionargs = {}

remap = {k.upper(): k for k in kw}

def opt(name: str, default=None) -> Any:
k = remap.get(name.upper(), None)
if k is None:
return default
return kw.pop(k, default)

def _gdal_level(compression: str, default=None) -> Any:
gdal_level_k = _GDAL_COMP.get(compression, None)
if gdal_level_k is None:
return default
return opt(gdal_level_k, default)

compression = compression.upper()

if level is None and "level" not in compressionargs:
# GDAL compat
level = _gdal_level(compression)

if level is not None:
compressionargs["level"] = level

if compression == "DEFLATE":
compression = "ADOBE_DEFLATE"
if compression == "LERC_DEFLATE":
compression = "LERC"
compressionargs["compression"] = "deflate"
if (lvl := _gdal_level("DEFLATE")) is not None:
compressionargs["compressionargs"] = {"level": lvl}
elif compression == "LERC_ZSTD":
compression = "LERC"
compressionargs["compression"] = "zstd"
if (lvl := _gdal_level("ZSTD")) is not None:
compressionargs["compressionargs"] = {"level": lvl}

if isinstance(predictor, Unset):
predictor = compression in ("ADOBE_DEFLATE", "ZSTD", "LZMA")

predictor = _norm_predictor(predictor, dtype)
return (predictor, compression, compressionargs)


def save_cog_with_dask(
xx: xr.DataArray,
dst: str = "",
*,
client: Any = None,
compression: Union[str, Unset] = Unset(),
compressionargs: Any = None,
level: Optional[Union[int, float]] = None,
predictor: Union[int, bool, Unset] = Unset(),
blocksize: Union[int, List[Union[int, Tuple[int, int]]]] = 2048,
blocksize: Union[Unset, int, List[Union[int, Tuple[int, int]]]] = Unset(),
bigtiff: bool = True,
overview_resampling: Union[int, str] = "nearest",
verbose: bool = False,
encoder_params: Any = None,
client: Any = None,
**kw,
):
# pylint: disable=import-outside-toplevel
t0 = monotonic()
from dask import bag, delayed
from dask.utils import format_time

if encoder_params is None:
encoder_params = {}

# usefull when debugging
optimize_graph = kw.pop("optimize_graph", True)

meta, hdr0 = make_empty_cog(
# normalize compression and remove GDAL compat options from kw
predictor, compression, compressionargs = _norm_compression_tifffile(
xx.dtype, predictor, compression, compressionargs, level=level, kw=kw
)
ydim = xx.odc.ydim
data_chunks: Tuple[int, int] = xx.data.chunksize[ydim : ydim + 2]
if isinstance(blocksize, Unset):
blocksize = [data_chunks, int(max(*data_chunks) // 2)]

meta, hdr0 = _make_empty_cog(
xx.shape,
xx.dtype,
xx.odc.geobox,
predictor=predictor,
compression=compression,
compressionargs=compressionargs,
blocksize=blocksize,
bigtiff=bigtiff,
nodata=xx.odc.nodata,
**kw,
)
hdr0 = bytes(hdr0)
Expand All @@ -1012,7 +1104,11 @@ def save_cog_with_dask(

_tiles = []
for scale_idx, (mm, img) in enumerate(zip(meta.flatten(), layers)):
_tiles.append(compress_tiles(img, mm, scale_idx=scale_idx, **encoder_params))
_tiles.append(
compress_tiles(
img, mm, scale_idx=scale_idx, compressionargs=compressionargs
)
)

hdr_info = bag.concat(
[t.map(lambda d: (*d["idx"], len(d["data"]))) for t in _tiles[::-1]]
Expand Down
27 changes: 22 additions & 5 deletions tests/test_cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from odc.geo._cog import (
CogMeta,
_compute_cog_spec,
_make_empty_cog,
_norm_compression_tifffile,
_num_overviews,
cog_gbox,
make_empty_cog,
)
from odc.geo.geobox import GeoBox
from odc.geo.gridspec import GridSpec
Expand Down Expand Up @@ -143,7 +144,12 @@ def test_cog_gbox(gbox: GeoBox, kw):
("int16", "deflate", 2),
("int16", "zstd", 2),
("uint8", "webp", 1),
("float32", Unset(), 3),
("int16", Unset(), 2),
("float32", "zstd", 3),
("float32", "adobe_deflate", 3),
("float32", "lerc", 1),
("float32", "lerc_deflate", 1),
("float32", "lerc_zstd", 1),
],
)
def test_empty_cog(shape, blocksize, expect_ax, dtype, compression, expect_predictor):
Expand All @@ -156,7 +162,7 @@ def test_empty_cog(shape, blocksize, expect_ax, dtype, compression, expect_predi
gbox = gbox.zoom_to(shape[:2])
assert gbox.shape == shape[:2]

meta, mm = make_empty_cog(
meta, mm = _make_empty_cog(
shape,
dtype,
gbox=gbox,
Expand All @@ -183,8 +189,12 @@ def test_empty_cog(shape, blocksize, expect_ax, dtype, compression, expect_predi

if isinstance(compression, str):
compression = compression.upper()
compression = {"DEFLATE": "ADOBE_DEFLATE"}.get(compression, compression)
assert p.compression.name == compression
if compression == "DEFLATE":
assert p.compression.name == "ADOBE_DEFLATE"
elif compression.startswith("LERC_"):
assert p.compression.name == "LERC"
else:
assert p.compression.name == compression
else:
# should default to deflate
assert p.compression == 8
Expand Down Expand Up @@ -255,3 +265,10 @@ def test_cog_meta(meta: CogMeta):
]:
with pytest.raises(IndexError):
_ = meta.flat_tile_idx(bad_idx)


def test_norm_compress():
predictor, compression, ca = _norm_compression_tifffile("int16")
assert compression == "ADOBE_DEFLATE"
assert predictor == 2
assert ca == {}

0 comments on commit dde6d2f

Please sign in to comment.