Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into nick/copy-link-out…
Browse files Browse the repository at this point in the history
…side
  • Loading branch information
nfrasser committed Mar 20, 2024
2 parents 19e0c99 + 6c88a93 commit d0dd71c
Show file tree
Hide file tree
Showing 13 changed files with 591 additions and 213 deletions.
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
root = true

[*]
charset = utf-8
indent_style = space
indent_size = 4
end_of_line = lf
trim_trailing_whitespace = true
insert_final_newline = true

[*.md]
indent_size = 2
59 changes: 33 additions & 26 deletions cryosparc/core.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from . cimport dataset
from . cimport lz4
from libc.stdint cimport uint8_t, uint32_t, uint64_t
from libc.stdint cimport uint8_t, uint16_t, uint64_t
from cpython.ref cimport PyObject, Py_XINCREF, Py_XDECREF
from cpython.mem cimport PyMem_Realloc, PyMem_Free

Expand Down Expand Up @@ -43,7 +43,7 @@ cdef class Data:
self._handle = dataset.dset_new()

if self._handle == 0:
raise MemoryError()
raise MemoryError("Could not allocate dataset")

def __dealloc__(self):
if self._handle:
Expand Down Expand Up @@ -104,26 +104,34 @@ cdef class Data:
return self.type(field) > 0

def addrows(self, int num):
return dataset.dset_addrows(self._handle, num)
if not dataset.dset_addrows(self._handle, num):
raise MemoryError("Could not add rows to dataset")

def addcol_scalar(self, str field, int dtype):
return dataset.dset_addcol_scalar(self._handle, field.encode(), dtype)
if not dataset.dset_addcol_scalar(self._handle, field.encode(), dtype):
raise MemoryError("Could not add column to dataset")

def addcol_array(self, str field, int dtype, int shape0, int shape1, int shape2):
return dataset.dset_addcol_array(self._handle, field.encode(), dtype, shape0, shape1, shape2)
def addcol_array(self, str field, int dtype, tuple shape):
if not 0 < len(shape) <= 3:
raise ValueError("Shape size must be between 0 and 3")

cdef uint16_t c_shape[3]
c_shape[0] = 0; c_shape[1] = 0; c_shape[2] = 0
for i in xrange(len(shape)):
c_shape[i] = <uint16_t> shape[i]

if not dataset.dset_addcol_array(self._handle, field.encode(), dtype, c_shape):
raise MemoryError("Could not add column to dataset")

def getshp(self, str colkey):
cdef list shp = []
cdef uint32_t val = dataset.dset_getshp(self._handle, colkey.encode())
cdef uint8_t dim0 = <uint8_t> (val & 0xFF)
cdef uint8_t dim1 = <uint8_t> ((val >> 8) & 0xFF)
cdef uint8_t dim2 = <uint8_t> ((val >> 16) & 0xFF)
if dim0:
shp.append(<int> dim0)
if dim1:
shp.append(<int> dim1)
if dim2:
shp.append(<int> dim2)
cdef uint64_t val = dataset.dset_getshp(self._handle, colkey.encode())
cdef uint16_t dim0 = <uint16_t> (val & 0xFFFF)
cdef uint16_t dim1 = <uint16_t> ((val >> 16) & 0xFFFF)
cdef uint16_t dim2 = <uint16_t> ((val >> 32) & 0xFFFF)
if dim0: shp.append(<int> dim0)
if dim1: shp.append(<int> dim1)
if dim2: shp.append(<int> dim2)
return tuple(shp)

def getbuf(self, str colkey):
Expand All @@ -134,10 +142,7 @@ cdef class Data:
with nogil:
mem = dataset.dset_get(self._handle, colkey_c)
size = dataset.dset_getsz(self._handle, colkey_c)
if size == 0:
return 0
else:
return <unsigned char [:size]> mem
return 0 if size == 0 else <unsigned char [:size]> mem

def getstr(self, str col, size_t index):
return dataset.dset_getstr(self._handle, col.encode(), index) # returns bytes
Expand All @@ -160,7 +165,8 @@ cdef class Data:
pybytes = pystr.encode()
Py_XDECREF(pycol[i]) # so string is deallocated
pycol[i] = NULL # so that strfree not attempted
dataset.dset_setstr(self._handle, colkey, i, pybytes, len(pybytes))
if not dataset.dset_setstr(self._handle, colkey, i, pybytes, len(pybytes)):
raise MemoryError("Could not convert strings")

return True

Expand Down Expand Up @@ -196,7 +202,7 @@ cdef class Data:
cdef uint64_t idx
cdef bytes pybytes = val.encode()
if not dataset.dset_stralloc(self._handle, pybytes, len(pybytes), &idx):
raise MemoryError()
raise MemoryError("Could not allocate string in dataset")
return <int> idx

def dump(self):
Expand All @@ -217,10 +223,11 @@ cdef class Data:

def setstrheap(self, bytes heap):
if not dataset.dset_setstrheap(self._handle, <const char *> heap, len(heap)):
raise MemoryError()
raise MemoryError("Could not set string heap in dataset")

def defrag(self, bint realloc_smaller):
return dataset.dset_defrag(self._handle, realloc_smaller)
if not dataset.dset_defrag(self._handle, realloc_smaller):
raise MemoryError("Could not defrag dataset")

def dumptxt(self, bint dump_data = 0):
dataset.dset_dumptxt(self._handle, dump_data)
Expand Down Expand Up @@ -311,7 +318,7 @@ cdef class Stream:
pybytes = pystr.encode()
allocres = dataset.dset_stralloc(handle, pybytes, len(pybytes), &idx)
if allocres == 0:
raise MemoryError()
raise MemoryError("Could not allocate string in dataset")
elif allocres == 2:
# dataset reallocated, coldata must be retrieved
coldata = <PyObject **> dataset.dset_get(handle, colkey)
Expand Down Expand Up @@ -358,7 +365,7 @@ cdef class Stream:
pybytes = pystr.encode()
allocres = dataset.dset_stralloc(handle, pybytes, len(pybytes), &idx)
if allocres == 0:
raise MemoryError()
raise MemoryError("Could not allocate string in dataset for compression")
elif allocres == 2:
# dataset reallocated, coldata must be retrieved
coldata = <PyObject **> dataset.dset_get(handle, colkey)
Expand Down
4 changes: 2 additions & 2 deletions cryosparc/dataset.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from libc.stdint cimport uint64_t, uint32_t
from libc.stdint cimport uint64_t, uint16_t, uint32_t
ctypedef uint64_t Dset

cdef extern from "cryosparc-tools/dataset.h":
Expand All @@ -21,7 +21,7 @@ cdef extern from "cryosparc-tools/dataset.h":

bint dset_addrows(Dset dset, uint32_t num) nogil
bint dset_addcol_scalar(Dset dset, const char *key, int type) nogil
bint dset_addcol_array(Dset dset, const char *key, int type, int shape0, int shape1, int shape2) nogil
bint dset_addcol_array(Dset dset, const char *key, int type, const uint16_t *shape) nogil
bint dset_changecol(Dset dset, const char *key, int type) nogil

bint dset_defrag(Dset dset, bint realloc_smaller) nogil
Expand Down
39 changes: 28 additions & 11 deletions cryosparc/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"""

from functools import reduce
from functools import lru_cache, reduce
from pathlib import PurePath
from typing import (
IO,
Expand Down Expand Up @@ -272,10 +272,14 @@ def append_many(
Returns:
Dataset: Appended dataset
"""
datasets = tuple(d for d in datasets if len(d) > 0) # skip empty datasets
if not datasets:
return cls()

first_dset = datasets[0]
datasets = tuple(d for d in datasets if len(d) > 0) # skip empty datasets
if not datasets:
return cls(first_dset) # keep the same fields as the first

if not repeat_allowed:
all_uids = n.concatenate([dset["uid"] for dset in datasets])
assert len(all_uids) == len(n.unique(all_uids)), "Cannot append datasets that contain the same UIDs."
Expand Down Expand Up @@ -350,7 +354,14 @@ def union_many(
Returns:
Dataset: combined dataset, or empty dataset if none are provided.
"""
if not datasets:
return cls()

first_dset = datasets[0]
datasets = tuple(d for d in datasets if len(d) > 0) # skip empty datasets
if not datasets:
return cls(first_dset) # keep the same fields as the first

keep_fields = cls.common_fields(*datasets, assert_same_fields=assert_same_fields)
keep_masks = []
keep_uids = n.array([], dtype=n.uint64)
Expand Down Expand Up @@ -594,6 +605,18 @@ def load(cls, file: Union[str, PurePath, IO[bytes]], cstrs: bool = False):
except Exception as err:
raise DatasetLoadError(f"Could not load dataset from file {file}") from err

@classmethod
def load_cached(cls, file: Union[str, PurePath, IO[bytes]], cstrs: bool = False):
"""Replicate Dataset.from_file but with cacheing.
This can significantly speed up end-of-job validation with a large number of outputs (e.g., 3D Classification)
"""
return cls._load_cached(file, cstrs).copy()

@classmethod
@lru_cache(maxsize=None)
def _load_cached(cls, file: Union[str, PurePath, IO[bytes]], cstrs: bool = False):
return cls.load(file, cstrs)

@classmethod
async def from_async_stream(cls, stream: AsyncBinaryIO):
headersize = u32intle(await stream.read(4))
Expand Down Expand Up @@ -1020,19 +1043,13 @@ def add_fields(
dt = n.dtype(fielddtype(field))
if dt.shape:
assert dt.base.type in TYPE_TO_DSET_MAP, f"Unsupported column data type {dt.base}"
shape = [0] * 3
shape[0 : len(dt.shape)] = dt.shape
assert self._data.addcol_array(
name, TYPE_TO_DSET_MAP[dt.base.type], *shape
), f"Could not add {field} with dtype {dt}"
self._data.addcol_array(name, TYPE_TO_DSET_MAP[dt.base.type], dt.shape)
elif dt.char in {"O", "S", "U"}: # all python string object types
assert self._data.addcol_scalar(name, DsetType.T_OBJ), f"Could not add {field} with dtype {dt}"
self._data.addcol_scalar(name, DsetType.T_OBJ)
self[name] = "" # Reset object field to empty string
else:
assert dt.type in TYPE_TO_DSET_MAP, f"Unsupported column data type {dt}"
assert self._data.addcol_scalar(
name, TYPE_TO_DSET_MAP[dt.type]
), f"Could not add {field} with dtype {dt}"
self._data.addcol_scalar(name, TYPE_TO_DSET_MAP[dt.type])

return self._reset()

Expand Down
47 changes: 20 additions & 27 deletions cryosparc/include/cryosparc-tools/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ void * dset_get (uint64_t dset, const char * colkey);
uint64_t dset_getsz (uint64_t dset, const char * colkey);
int dset_setstr (uint64_t dset, const char * colkey, uint64_t index, const char * value, size_t length);
const char* dset_getstr (uint64_t dset, const char * colkey, uint64_t index);
uint32_t dset_getshp (uint64_t dset, const char * colkey);
uint64_t dset_getshp (uint64_t dset, const char * colkey);

int dset_addrows (uint64_t dset, uint32_t num);
int dset_addcol_scalar (uint64_t dset, const char * key, int type);
int dset_addcol_array (uint64_t dset, const char * key, int type, int shape0, int shape1, int shape2);
int dset_addcol_array (uint64_t dset, const char * key, int type, const uint16_t *shape);
int dset_changecol (uint64_t dset, const char * key, int type);

int dset_defrag (uint64_t dset, int realloc_smaller);
Expand Down Expand Up @@ -224,7 +224,7 @@ typedef struct {
uint64_t longkey;
};
int8_t type; // magnitude matches enum dset_type. negative means use longkey
uint8_t shape[3]; // safe to leave as zero for scalars
uint16_t shape[3]; // safe to leave as zero for scalars
uint64_t offset; // relative to start of respective heap

} ds_column;
Expand Down Expand Up @@ -546,7 +546,7 @@ getkey(const ds *d, const ds_column *c)
}

static ds_column *
column_lookup(ds * d, const char *colkey, size_t *idx)
column_lookup(ds *d, const char *colkey, uint64_t *idx)
{
if(!d) return 0;

Expand Down Expand Up @@ -957,10 +957,10 @@ stralloc(uint64_t dsetidx, const char *str, size_t len, uint64_t *index) {
return d;
}

static inline char *
getstr(ds *d, uint64_t col, uint64_t index) {
char *ptr = (char *) d;
uint64_t *handles = (uint64_t*)(ptr + d->arrheap_start + d->columns[col].offset);
static inline const char *
getstr(const ds *d, uint64_t col, uint64_t index) {
const char *ptr = (const char *) d;
const uint64_t *handles = (const uint64_t *)(ptr + d->arrheap_start + d->columns[col].offset);
return ptr + d->strheap_start + handles[index];
}

Expand Down Expand Up @@ -1147,7 +1147,7 @@ uint64_t dset_innerjoin(const char *key, uint64_t dset_r, uint64_t dset_s)
// Populate fields from the first dataset R. Declare a stack-allocated
// dynamic array of structs which memoize the required column data
ds_column *col;
size_t colidx;
uint64_t colidx;
const char *colkey;

// Cache source column details (try to use stack version if possible)
Expand All @@ -1164,10 +1164,7 @@ uint64_t dset_innerjoin(const char *key, uint64_t dset_r, uint64_t dset_s)
// key is either target join key or not in other dataset, add now
// with correct type details
col = column_lookup(ds_r, colkey, &colidx);
if (!dset_addcol_array(
dset, colkey, abs_i8(col->type),
col->shape[0], col->shape[1], col->shape[2]
)) {
if (!dset_addcol_array(dset, colkey, abs_i8(col->type), col->shape)) {
nonfatal("dset_innerjoin: cannot add column %s to result dataset", colkey);
goto fail;
}
Expand All @@ -1185,10 +1182,7 @@ uint64_t dset_innerjoin(const char *key, uint64_t dset_r, uint64_t dset_s)
continue; // already added in previous loop
}
col = column_lookup(ds_s, colkey, &colidx);
if (!dset_addcol_array(
dset, colkey, abs_i8(col->type),
col->shape[0], col->shape[1], col->shape[2]
)) {
if (!dset_addcol_array(dset, colkey, abs_i8(col->type), col->shape)) {
nonfatal("dset_innerjoin: cannot add column %s to result dataset", colkey);
goto fail;
}
Expand Down Expand Up @@ -1355,7 +1349,7 @@ uint64_t dset_getsz(uint64_t dset, const char * colkey)
return d->nrow * type_size[abs_i8(c->type)] * stride(c);
}

uint32_t dset_getshp (uint64_t dset, const char * colkey)
uint64_t dset_getshp (uint64_t dset, const char * colkey)
{
ds *d = handle_lookup(dset, colkey, 0, 0);
ds_column *c = column_lookup(d, colkey, NULL);
Expand All @@ -1364,15 +1358,15 @@ uint32_t dset_getshp (uint64_t dset, const char * colkey)

// Each byte in the result is a member of the shape tuple (ordered by
// significance)
return c->shape[0] | c->shape[1] << 8 | c->shape[2] << 16;
return (uint64_t) c->shape[0] | (uint64_t) c->shape[1] << 16 | (uint64_t) c->shape[2] << 32;
}

int dset_addcol_scalar (uint64_t dset, const char * key, int type) {
return dset_addcol_array(dset, key, type, 0, 0, 0);
return dset_addcol_array(dset, key, type, NULL);
}


int dset_addcol_array (uint64_t dset, const char * key, int type, int shape0, int shape1, int shape2) {
int dset_addcol_array (uint64_t dset, const char * key, int type, const uint16_t *shape) {

if(!tcheck(type)) {
nonfatal("invalid column data type: %i (key %s)", type, key);
Expand All @@ -1382,7 +1376,7 @@ int dset_addcol_array (uint64_t dset, const char * key, int type, int shape0, in
uint64_t idx;
ds *d = handle_lookup(dset, "add column", 0, &idx);
if (!d) {
nonfatal("could not find dataset with handle %lu (adding column %s)", dset, key);
nonfatal("could not find dataset with handle %llu (adding column %s)", dset, key);
return 0;
}

Expand All @@ -1392,9 +1386,8 @@ int dset_addcol_array (uint64_t dset, const char * key, int type, int shape0, in
// hypothetical new column descriptor.
ds_column col;
col.type = ksz > SHORTKEYSZ ? -t : t;
col.shape[0] = (uint8_t) shape0;
col.shape[1] = (uint8_t) shape1;
col.shape[2] = (uint8_t) shape2;
col.shape[0] = 0; col.shape[1] = 0; col.shape[2] = 0;
for (int i = 0; shape != NULL && shape[i] != 0 && i < 3; i++) col.shape[i] =shape[i];

if (d->ncol == d->ccol) {

Expand Down Expand Up @@ -1685,7 +1678,7 @@ void dset_dumptxt (uint64_t dset, int dump_data) {

char buf[1000];

char * data = d;
char *data = (char *)d;
data += d->arrheap_start + c->offset;

#define REPR(sym,_a,type,_c,spec,reprfn) \
Expand Down Expand Up @@ -1727,7 +1720,7 @@ int dset_setstrheap(uint64_t dset, const char *heap, size_t size) {
d->strheap_sz = 1; // 1 for empty string
ht64_clear(&slot->ht);

char *s = heap;
const char *s = heap;
size_t len;
uint64_t idx;
while (d && s < heap + size) {
Expand Down
Loading

0 comments on commit d0dd71c

Please sign in to comment.