Skip to content

Commit

Permalink
Merge pull request #148 from pycompression/bgzipperformance
Browse files Browse the repository at this point in the history
Better BGZF format streaming performance.
  • Loading branch information
rhpvorderman authored Sep 20, 2023
2 parents 92e4cf2 + 45c0a9d commit ecf9117
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 72 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.
version 1.3.0-dev
-----------------
+ Gzip headers are now actively checked for a BGZF extra field. If found the
block size is taken into account when decompressing. This has further
improved bgzf decompression speed by 5% on some files compared to the
more generic solution of 1.2.0.
+ Integrated CPython 3.11 code for reading gzip headers. This leads to more
commonality between the python-isal code and the upstream gzip.py code.
This has enabled the change above. It comes at the cost of a slight increase
in overhead at the ``gzip.decompress`` function.

version 1.2.0
-----------------
+ Bgzip files are now detected and a smaller reading buffer is used to
Expand Down
170 changes: 100 additions & 70 deletions src/isal/igzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,80 @@ def write(self, data):
return length


def detect_bgzip(header: bytes) -> bool:
if len(header) < 18:
return False
magic, method, flags, mtime, xfl, os, xlen, si1, si2, slen, bsize = \
struct.unpack("<HBBIBBHBBHH", header[:18])
return (
method == 8 and # Deflate method used
flags & 4 and # There are extra fields
xlen == 6 and # The extra field should be of length 6
si1 == 66 and # BGZIP magic number one
si2 == 67 and # BGZIP magic number two
slen == 2 # The length of the 16 bit integer that stores
# the size of the block
)
def _read_exact(fp, n):
'''Read exactly *n* bytes from `fp`
This method is required because fp may be unbuffered,
i.e. return short reads.
'''
data = fp.read(n)
while len(data) < n:
b = fp.read(n - len(data))
if not b:
raise EOFError("Compressed file ended before the "
"end-of-stream marker was reached")
data += b
return data


def _read_gzip_header(fp):
'''Read a gzip header from `fp` and progress to the end of the header.
Returns None if header not present. Parses mtime from the header, looks
for BGZF format blocks and parses the block size, setting it to None if
not present. Returns a tuple of mtime, block_size if a header was present.
'''
# Do not use read_exact because a header may not be present. Read twice
# since fp might be unbuffered.
magic = fp.read(1) + fp.read(1)
if magic == b'':
return None

if magic != b'\037\213':
raise BadGzipFile('Not a gzipped file (%r)' % magic)

common_fields = _read_exact(fp, 8)
(method, flag, last_mtime) = struct.unpack("<BBIxx", common_fields)
if method != 8:
raise BadGzipFile('Unknown compression method')
block_size = None
if not flag: # Likely when data compressed in memory
return last_mtime, block_size
header = magic + common_fields
if flag & FEXTRA:
# Read & discard the extra field, if present
encoded_length = _read_exact(fp, 2)
extra_len, = struct.unpack("<H", encoded_length)
extra_field = _read_exact(fp, extra_len)
# Bgzip file detection
if extra_len == 6:
s1, s2, slen, bsize = struct.unpack("<BBHH", extra_field)
if s1 == 66 and s2 == 67 and slen == 2:
# Bgzip magic and correct slen.
block_size = bsize
header = header + encoded_length + extra_field
if flag & FNAME:
# Read and discard a null-terminated string containing the filename
while True:
s = _read_exact(fp, 1)
header += s
if s == b'\000':
break
if flag & FCOMMENT:
# Read and discard a null-terminated string containing a comment
while True:
s = _read_exact(fp, 1)
header += s
if s == b'\000':
break
if flag & FHCRC:
header_crc_encoded = _read_exact(fp, 2)
header_crc, = struct.unpack("<H", header_crc_encoded)
crc = isal_zlib.crc32(header) & 0xFFFF
if header_crc != crc:
raise BadGzipFile(f"Corrupted gzip header. Checksums do not "
f"match: {crc:04x} != {header_crc:04x}")
return last_mtime, block_size


class _PaddedFile(gzip._PaddedFile):
Expand Down Expand Up @@ -266,14 +326,21 @@ def __init__(self, fp):
self._new_member = True
self._last_mtime = None
self._read_buffer_size = READ_BUFFER_SIZE
if hasattr(fp, "peek") and detect_bgzip(fp.peek(18)):
# bgzip consists of puny little blocks of max 64K uncompressed data
# so in practice probably more around 16K in compressed size. A
# 128K buffer is a massive overshoot and slows down the
# decompression.
# bgzip stores the block size, so it can be unpacked more
# efficiently but this is outside scope for python-isal.
self._read_buffer_size = 16 * 1024

def _read_gzip_header(self):
header_info = _read_gzip_header(self._fp)
if header_info is None:
return False
# Get the BGZF block size from the header if present. If the read
# buffer size is set to exactly the block size, there will be less
# overhead as reading the file will stop right before the gzip trailer.
# On normal gzip files nothing happens and this optimization is not
# detrimental.
last_mtime, block_size = header_info
self._last_mtime = last_mtime
self._read_buffer_size = (block_size if block_size is not None
else READ_BUFFER_SIZE)
return True

def read(self, size=-1):
if size < 0:
Expand All @@ -299,7 +366,9 @@ def read(self, size=-1):
if self._new_member:
# If the _new_member flag is set, we have to
# jump to the next member, if there is one.
self._init_read()
self._crc = isal_zlib.crc32(b"")
# Decompressed size of unconcatenated stream
self._stream_size = 0
if not self._read_gzip_header():
self._size = self._pos
return b""
Expand Down Expand Up @@ -363,61 +432,22 @@ def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=None):
return header + compressed


def _gzip_header_end(data: bytes) -> int:
"""
Find the start of the raw deflate block in a gzip file.
:param data: Compressed data that starts with a gzip header.
:return: The end of the header / start of the raw deflate block.
"""
eof_error = EOFError("Compressed file ended before the end-of-stream "
"marker was reached")
if len(data) < 10:
raise eof_error
# We are not interested in mtime, xfl and os flags.
magic, method, flags = struct.unpack("<HBB", data[:4])
if magic != 0x8b1f:
raise BadGzipFile(f"Not a gzipped file ({repr(data[:2])})")
if method != 8:
raise BadGzipFile("Unknown compression method")
if not flags: # Likely when data compressed in memory
return 10
pos = 10
if flags & FEXTRA:
if len(data) < pos + 2:
raise eof_error
xlen, = struct.unpack("<H", data[pos: pos+2])
pos += 2 + xlen
if flags & FNAME:
pos = data.find(b"\x00", pos) + 1
# pos will be -1 + 1 when null byte not found.
if not pos:
raise eof_error
if flags & FCOMMENT:
pos = data.find(b"\x00", pos) + 1
if not pos:
raise eof_error
if flags & FHCRC:
if len(data) < pos + 2:
raise eof_error
header_crc, = struct.unpack("<H", data[pos: pos+2])
# CRC is stored as a 16-bit integer by taking last bits of crc32.
crc = isal_zlib.crc32(data[:pos]) & 0xFFFF
if header_crc != crc:
raise BadGzipFile(f"Corrupted gzip header. Checksums do not "
f"match: {crc:04x} != {header_crc:04x}")
pos += 2
return pos


def decompress(data):
"""Decompress a gzip compressed string in one shot.
Return the decompressed string.
This function checks for extra gzip members. Using
isal_zlib.decompress(data, wbits=31) is faster in cases where only one
gzip member is guaranteed to be present.
"""
decompressed_members = []
while True:
if not data: # Empty data returns empty bytestring
return b"".join(decompressed_members)
header_end = _gzip_header_end(data)
fp = io.BytesIO(data)
if _read_gzip_header(fp) is None:
return b"".join(decompressed_members)
header_end = fp.tell()
# Use a zlib raw deflate compressor
do = isal_zlib.decompressobj(wbits=-isal_zlib.MAX_WBITS)
# Read all the data except the header
Expand Down
6 changes: 4 additions & 2 deletions tests/test_igzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ def headers():


@pytest.mark.parametrize("header", list(headers()))
def test_gzip_header_end(header):
assert igzip._gzip_header_end(header) == len(header)
def test_read_gzip_header_position(header):
fp = io.BytesIO(header)
igzip._read_gzip_header(fp)
assert fp.tell() == len(header)


def test_header_too_short():
Expand Down

0 comments on commit ecf9117

Please sign in to comment.