Source code for numcodecs.gzip

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import gzip as _gzip
import io


from .abc import Codec
from .compat import ensure_bytes, ensure_contiguous_ndarray, PY2


if PY2:  # pragma: py3 no cover
    from cStringIO import StringIO


[docs]class GZip(Codec): """Codec providing gzip compression using zlib via the Python standard library. Parameters ---------- level : int Compression level. """ codec_id = 'gzip' def __init__(self, level=1): self.level = level
[docs] def encode(self, buf): # normalise inputs buf = ensure_contiguous_ndarray(buf) if PY2: # pragma: py3 no cover # view as u1 needed on PY2 # ref: https://github.com/zarr-developers/numcodecs/pull/128#discussion_r236786466 buf = buf.view('u1') # do compression compressed = io.BytesIO() with _gzip.GzipFile(fileobj=compressed, mode='wb', compresslevel=self.level) as compressor: compressor.write(buf) compressed = compressed.getvalue() return compressed
# noinspection PyMethodMayBeStatic
[docs] def decode(self, buf, out=None): # normalise inputs if PY2: # pragma: py3 no cover # On Python 2, StringIO always uses the buffer protocol. buf = StringIO(ensure_contiguous_ndarray(buf)) else: # pragma: py2 no cover # BytesIO only copies if the data is not of `bytes` type. # This allows `bytes` objects to pass through without copying. buf = io.BytesIO(ensure_bytes(buf)) # do decompression with _gzip.GzipFile(fileobj=buf, mode='rb') as decompressor: if out is not None: out_view = ensure_contiguous_ndarray(out) decompressor.readinto(out_view) if decompressor.read(1) != b'': raise ValueError("Unable to fit data into `out`") else: out = decompressor.read() return out