Skip to content

Commit

Permalink
Merge pull request #29 from cta-observatory/chunked_reading
Browse files Browse the repository at this point in the history
Chunked reading
  • Loading branch information
maxnoe authored Mar 29, 2023
2 parents 8e45125 + 5ae84f7 commit ebaefd5
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 15 deletions.
25 changes: 12 additions & 13 deletions corsikaio/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)
from .subblocks.longitudinal import longitudinal_header_dtype
from .subblocks.data import mmcs_cherenkov_photons_dtype
from .io import read_block, read_buffer_size, open_compressed
from .io import iter_blocks, read_buffer_size, open_compressed

from .constants import BLOCK_SIZE_BYTES, EVTH_VERSION_POSITION

Expand All @@ -30,7 +30,7 @@ def _to_floatarray(block):

class CorsikaFile:
"""
A file to iterate over events in a CORSIKA binary output file
A file to iterate over events in a CORSIKA binary output file.
"""

def __init__(self, path, parse_blocks=True):
Expand All @@ -39,8 +39,9 @@ def __init__(self, path, parse_blocks=True):
self.parse_blocks = parse_blocks
self._buffer_size = read_buffer_size(path)
self._f = open_compressed(path)
self._block_iter = iter_blocks(self._f)

runh_bytes = self.read_block()
runh_bytes = next(self._block_iter)
if not runh_bytes[:4] == b'RUNH':
raise ValueError('File does not start with b"RUNH"')

Expand All @@ -59,18 +60,18 @@ def run_end(self):
self._f.seek(-4, 2)

self._f.seek(-BLOCK_SIZE_BYTES, 1)
block = self.read_block()
block = self._f.read(BLOCK_SIZE_BYTES)
while block[:4] != b'RUNE':
self._f.seek(-2 * BLOCK_SIZE_BYTES, 1)
block = self.read_block()
block = self._f.read(BLOCK_SIZE_BYTES)

self._run_end = parse_run_end(block)[0]
self._f.seek(pos)

return self._run_end

def __next__(self):
block = self.read_block()
block = next(self._block_iter)

if block[:4] == b'RUNE':
self._run_end = parse_run_end(block)
Expand All @@ -87,15 +88,15 @@ def __next__(self):
data_bytes = bytearray()
long_bytes = bytearray()

block = self.read_block()
block = next(self._block_iter)
while block[:4] != b'EVTE':

if block[:4] == b'LONG':
long_bytes += block[longitudinal_header_dtype.itemsize:]
else:
data_bytes += block

block = self.read_block()
block = next(self._block_iter)

if self.parse_blocks:
event_end = parse_event_end(block)[0]
Expand All @@ -120,7 +121,8 @@ def read_headers(self):
pos = self._f.tell()
self._f.seek(0)

block = self.read_block()
block_iter = iter_blocks(self._f)
block = next(block_iter)
event_header_data = bytearray()
end_found = True

Expand All @@ -142,17 +144,14 @@ def read_headers(self):
elif block[:4] == b'EVTE':
end_found = True

block = self.read_block()
block = next(block_iter)

self._f.seek(pos)

event_headers = parse_event_header(event_header_data)

return self.run_header, event_headers, self._run_end

def read_block(self):
return read_block(self._f, self._buffer_size)

def __enter__(self):
return self

Expand Down
45 changes: 43 additions & 2 deletions corsikaio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
from .constants import BLOCK_SIZE_BYTES


#: buffersize for files not written as fortran sequential file
DEFAULT_BUFFER_SIZE = BLOCK_SIZE_BYTES * 100
#: struct definition of the fortran record marker
RECORD_MARKER = struct.Struct('i')


def is_gzip(path):
'''Test if a file is gzipped by reading its first two bytes and compare
to the gzip marker bytes.
Expand Down Expand Up @@ -42,16 +48,51 @@ def read_buffer_size(path):
size of the CORSIKA buffer in bytes
'''
with open_compressed(path) as f:
data = f.read(4)
data = f.read(RECORD_MARKER.size)

if data == b'RUNH':
return None

buffer_size, = struct.unpack('I', data)
buffer_size, = RECORD_MARKER.unpack(data)

return buffer_size


def iter_blocks(f):
is_fortran_file = True
buffer_size = DEFAULT_BUFFER_SIZE


data = f.read(4)
f.seek(0)
if data == b'RUNH':
is_fortran_file = False

while True:
# for the fortran-chunked output, we need to read the record size
if is_fortran_file:
data = f.read(RECORD_MARKER.size)
if len(data) < RECORD_MARKER.size:
raise IOError("Read less bytes than expected, file seems to be truncated")

buffer_size, = RECORD_MARKER.unpack(data)

data = f.read(buffer_size)

n_blocks = len(data) // BLOCK_SIZE_BYTES
for block in range(n_blocks):
start = block * BLOCK_SIZE_BYTES
stop = start + BLOCK_SIZE_BYTES
block = data[start:stop]
if len(block) < BLOCK_SIZE_BYTES:
raise IOError("Read less bytes than expected, file seems to be truncated")
yield block

# read trailing record marker
if is_fortran_file:
f.read(RECORD_MARKER.size)


def read_block(f, buffer_size=None):
'''
Reads a block of CORSIKA output, e.g. 273 4-byte floats.
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ zstd =
zstandard
tests =
pytest
scipy
all =
%(zstd)s
%(tests)s
Expand Down
98 changes: 98 additions & 0 deletions tests/test_io.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import pytest
import numpy as np
from scipy.io import FortranFile

from corsikaio.io import iter_blocks


test_files = (
'tests/resources/mmcs65',
'tests/resources/corsika74100'
Expand Down Expand Up @@ -29,6 +36,97 @@ def test_read_block():
assert block[:4] == b'RUNH'


@pytest.fixture()
def simple_dummy_file(tmp_path):
path = tmp_path / "simple.dat"
data = bytearray(np.arange(273).astype(np.float32))

with path.open('wb') as f:
data[:4] = b'RUNH'
f.write(data)

for _ in range(5):
data[:4] = b'EVTH'
f.write(data)
for _ in range(3):
data[:4] = np.float32(0).tobytes()
f.write(data)
data[:4] = b'EVTE'
f.write(data)
data[:4] = b'RUNE'
f.write(data)
return path


@pytest.fixture()
def fortran_raw_dummy_file(tmp_path):
path = tmp_path / "fortran_raw.dat"
data = np.arange(273).astype(np.float32)


blocks = []
data[0] = np.frombuffer(b'RUNH', dtype=np.float32)
blocks.append(data.copy())

for _ in range(5):
data[0] = np.frombuffer(b'EVTH', dtype=np.float32)
blocks.append(data.copy())
for _ in range(3):
data[0] = 0.0
blocks.append(data.copy())
data[0] = np.frombuffer(b'EVTE', dtype=np.float32)
blocks.append(data.copy())
data[0] = np.frombuffer(b'RUNE', dtype=np.float32)
blocks.append(data.copy())

blocks_per_record = 5
with FortranFile(path, mode='w') as f:
n_records = len(blocks) // blocks_per_record + 1

for i in range(n_records):
start = i * blocks_per_record
stop = (i + 1) * blocks_per_record
print(start, stop, len(blocks))
f.write_record(np.concatenate(blocks[start:stop]))

return path


@pytest.fixture(params=["simple", "fortran_raw"])
def dummy_file(request, fortran_raw_dummy_file, simple_dummy_file):
if request.param == "simple":
return simple_dummy_file
else:
return fortran_raw_dummy_file


def test_iter_blocks_simple_file(dummy_file):
"""Test for iterblocks for the case of no record markers"""
data = np.arange(273).astype(np.float32)

with dummy_file.open('rb') as f:
block_it = iter_blocks(f)
block = next(block_it)
assert block[:4] == b'RUNH'
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()

for _ in range(5):
block = next(block_it)
assert block[:4] == b'EVTH'
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()
for _ in range(3):
block = next(block_it)
assert (np.frombuffer(block, np.float32) == data).all()
block = next(block_it)
assert block[:4] == b'EVTE'
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()

block = next(block_it)
assert block[:4] == b'RUNE'
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()



def test_versions():
from corsikaio.io import read_buffer_size, read_block
from corsikaio.subblocks import get_version
Expand Down

0 comments on commit ebaefd5

Please sign in to comment.