Skip to content

Commit

Permalink
Merge pull request #16 from Amulet-Team/nogil
Browse files Browse the repository at this point in the history
Nogil and cleanup
  • Loading branch information
gentlegiantJGC authored Nov 3, 2024
2 parents 8430c0e + fb6f86f commit ddae0b6
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 104 deletions.
2 changes: 1 addition & 1 deletion src/leveldb/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class LevelDB:
Set a value in the database.
"""

def put_batch(self, batch: dict[bytes, bytes | None]) -> None:
def put_batch(self, batch: collections.abc.Mapping[bytes, bytes]) -> None:
"""
Set a group of values in the database.
"""
Expand Down
161 changes: 101 additions & 60 deletions src/leveldb/__init__leveldb.py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <leveldb/write_batch.h>
#include <leveldb/zlib_compressor.h>


#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/typing.h>
Expand All @@ -21,6 +20,71 @@

namespace py = pybind11;

namespace PYBIND11_NAMESPACE {
namespace detail {
template <>
struct type_caster<leveldb::Slice> {
public:
PYBIND11_TYPE_CASTER(leveldb::Slice, const_name("bytes"));

bool load(handle src, bool)
{
PyObject* source = src.ptr();
if (!PyBytes_Check(source)) {
return false;
}
Py_ssize_t size = PyBytes_Size(src.ptr());
const char* buffer = PyBytes_AsString(src.ptr());
if (!buffer) {
return false;
}
value = leveldb::Slice(buffer, size);
return true;
}

// This causes a crash that I don't understand
// static handle cast(const leveldb::Slice& src, return_value_policy /* policy */, handle /* parent */)
//{
// return py::bytes(src.data(), src.size());
//}
};

template <>
struct type_caster<leveldb::WriteBatch> {
public:
PYBIND11_TYPE_CASTER(leveldb::WriteBatch, const_name("collections.abc.Mapping[bytes, bytes]"));

bool load(handle src, bool)
{
auto getitem = src.attr("__getitem__");
for (auto& key : src) {
if (!PyBytes_Check(key.ptr())) {
return false;
}
Py_ssize_t key_size = PyBytes_Size(key.ptr());
const char* key_buffer = PyBytes_AsString(key.ptr());
if (!key_buffer) {
return false;
}

auto val = getitem(key);
if (val.is_none()) {
value.Delete(leveldb::Slice(key_buffer, key_size));
} else {
Py_ssize_t val_size = PyBytes_Size(val.ptr());
const char* val_buffer = PyBytes_AsString(val.ptr());
if (!val_buffer) {
return false;
}
value.Put(leveldb::Slice(key_buffer, key_size), leveldb::Slice(val_buffer, val_size));
}
}
return true;
}
};
}
} // namespace PYBIND11_NAMESPACE::detail

namespace {

class LevelDBException : public std::runtime_error {
Expand Down Expand Up @@ -116,7 +180,6 @@ class LevelDBKeysIterator {

py::bytes next()
{
auto lock = iterator_ptr->lock_shared();
auto& iterator = *iterator_ptr;
if (!iterator) {
throw std::runtime_error("LevelDBIterator has been deleted.");
Expand Down Expand Up @@ -146,7 +209,6 @@ class LevelDBValuesIterator {

py::bytes next()
{
auto lock = iterator_ptr->lock_shared();
auto& iterator = *iterator_ptr;
if (!iterator) {
throw std::runtime_error("LevelDBIterator has been deleted.");
Expand Down Expand Up @@ -176,7 +238,6 @@ class LevelDBItemsIterator {

py::typing::Tuple<py::bytes, py::bytes> next()
{
auto lock = iterator_ptr->lock_shared();
auto& iterator = *iterator_ptr;
if (!iterator) {
throw std::runtime_error("LevelDBIterator has been deleted.");
Expand Down Expand Up @@ -211,7 +272,6 @@ class LevelDBItemsRangeIterator {

py::typing::Tuple<py::bytes, py::bytes> next()
{
auto lock = iterator_ptr->lock_shared();
auto& iterator = *iterator_ptr;
if (!iterator) {
throw std::runtime_error("LevelDBIterator has been deleted.");
Expand Down Expand Up @@ -257,7 +317,6 @@ void init_leveldb(py::module m)
LevelDBIterator.def(
"valid",
[](Amulet::LevelDBIterator& self) {
auto lock = self.lock_shared();
return self && self->Valid();
},
py::doc(
Expand All @@ -266,7 +325,6 @@ void init_leveldb(py::module m)
LevelDBIterator.def(
"seek_to_first",
[](Amulet::LevelDBIterator& self) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("LevelDBIterator has been deleted.");
}
Expand All @@ -276,7 +334,6 @@ void init_leveldb(py::module m)
LevelDBIterator.def(
"seek_to_last",
[](Amulet::LevelDBIterator& self) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("LevelDBIterator has been deleted.");
}
Expand All @@ -285,12 +342,11 @@ void init_leveldb(py::module m)
py::doc("Seek to the last entry in the database."));
LevelDBIterator.def(
"seek",
[](Amulet::LevelDBIterator& self, py::bytes target) {
auto lock = self.lock_shared();
[](Amulet::LevelDBIterator& self, leveldb::Slice target) {
if (!self) {
throw std::runtime_error("LevelDBIterator has been deleted.");
}
self->Seek(target.cast<std::string>());
self->Seek(target);
},
py::arg("target"),
py::doc(
Expand All @@ -299,7 +355,6 @@ void init_leveldb(py::module m)
LevelDBIterator.def(
"next",
[](Amulet::LevelDBIterator& self) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("LevelDBIterator has been deleted.");
}
Expand All @@ -310,7 +365,6 @@ void init_leveldb(py::module m)
LevelDBIterator.def(
"prev",
[](Amulet::LevelDBIterator& self) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("LevelDBIterator has been deleted.");
}
Expand All @@ -321,29 +375,27 @@ void init_leveldb(py::module m)
LevelDBIterator.def(
"key",
[](Amulet::LevelDBIterator& self) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("LevelDBIterator has been deleted.");
}
if (!self->Valid()) {
throw std::runtime_error("LevelDBIterator does not point to a valid value.");
}
return py::bytes(self->key().ToString());
return py::bytes(self->key().data(), self->key().size());
},
py::doc(
"Get the key of the current entry in the database.\n"
":raises: runtime_error if iterator is not valid."));
LevelDBIterator.def(
"value",
[](Amulet::LevelDBIterator& self) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("LevelDBIterator has been deleted.");
}
if (!self->Valid()) {
throw std::runtime_error("LevelDBIterator does not point to a valid value.");
}
return py::bytes(self->value().ToString());
return py::bytes(self->value().data(), self->value().size());
},
py::doc(
"Get the value of the current entry in the database.\n"
Expand All @@ -367,29 +419,30 @@ void init_leveldb(py::module m)
LevelDB.def(
"close",
&Amulet::LevelDB::close,
py::arg("compact") = false,
py::doc(
"Close the leveldb database.\n"
"\n"
":param compact: If True will compact the database making it take less memory."));
"Only the owner of the database may close it.\n"
"If needed, an external lock must be used to ensure that no other threads are accessing the database."
),
py::call_guard<py::gil_scoped_release>());

LevelDB.def(
"compact",
[](Amulet::LevelDB& self) {
auto lock = self.lock_shared();
py::gil_scoped_release gil;
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
self->CompactRange(nullptr, nullptr);
},
py::doc("Remove deleted entries from the database to reduce its size."));

auto put = [](Amulet::LevelDB& self, py::bytes key, py::bytes value) {
auto lock = self.lock_shared();
auto put = [](Amulet::LevelDB& self, leveldb::Slice key, leveldb::Slice value) {
py::gil_scoped_release gil;
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
auto status = self->Put(self.get_write_options(), key.cast<std::string>(), value.cast<std::string>());
auto status = self->Put(self.get_write_options(), key, value);
if (!status.ok()) {
throw LevelDBException(status.ToString());
}
Expand All @@ -399,19 +452,11 @@ void init_leveldb(py::module m)

LevelDB.def(
"put_batch",
[](Amulet::LevelDB& self, py::typing::Dict<py::bytes, std::optional<py::bytes>> py_data) {
auto lock = self.lock_shared();
[](Amulet::LevelDB& self, leveldb::WriteBatch batch) {
py::gil_scoped_release gil;
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
leveldb::WriteBatch batch;
for (auto& item : py_data) {
if (item.second.is_none()) {
batch.Delete(item.first.cast<std::string>());
} else {
batch.Put(item.first.cast<std::string>(), item.second.cast<std::string>());
}
}
leveldb::Status status = self->Write(self.get_write_options(), &batch);
if (!status.ok()) {
throw LevelDBException(status.ToString());
Expand All @@ -422,28 +467,31 @@ void init_leveldb(py::module m)

LevelDB.def(
"__contains__",
[](Amulet::LevelDB& self, py::bytes key) {
auto lock = self.lock_shared();
[](Amulet::LevelDB& self, leveldb::Slice key) {
py::gil_scoped_release gil;
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
std::string value;
return self->Get(self.get_read_options(), key.cast<std::string>(), &value).ok();
return self->Get(self.get_read_options(), key, &value).ok();
},
py::arg("key"));

auto get = [](Amulet::LevelDB& self, py::bytes key) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
auto get = [](Amulet::LevelDB& self, leveldb::Slice key) {
std::string value;
auto status = self->Get(self.get_read_options(), key.cast<std::string>(), &value);
leveldb::Status status;
{
py::gil_scoped_release gil;
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
status = self->Get(self.get_read_options(), key, &value);
}
switch (status.code()) {
case leveldb::Status::kOk:
return py::bytes(value);
case leveldb::Status::kNotFound:
throw py::key_error(key);
throw py::key_error(key.ToString());
default:
throw LevelDBException(status.ToString());
}
Expand All @@ -461,12 +509,12 @@ void init_leveldb(py::module m)
":raises: LevelDBException on other error."));
LevelDB.def("__getitem__", get, py::arg("key"));

auto del = [](Amulet::LevelDB& self, py::bytes key) {
auto lock = self.lock_shared();
auto del = [](Amulet::LevelDB& self, leveldb::Slice key) {
py::gil_scoped_release gil;
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
auto status = self->Delete(self.get_write_options(), key.cast<std::string>());
auto status = self->Delete(self.get_write_options(), key);
if (!status.ok()) {
throw LevelDBException(status.ToString());
}
Expand All @@ -492,7 +540,6 @@ void init_leveldb(py::module m)
Amulet::LevelDB& self,
std::optional<py::bytes> start,
std::optional<py::bytes> end) {
auto lock = self.lock_shared();
if (!self) {
throw std::runtime_error("The LevelDB database has been closed.");
}
Expand All @@ -506,12 +553,10 @@ void init_leveldb(py::module m)

if (end) {
return pybind11_extensions::make_iterator(
LevelDBItemsRangeIterator(std::move(iterator_ptr), end->cast<std::string>())
);
LevelDBItemsRangeIterator(std::move(iterator_ptr), end->cast<std::string>()));
} else {
return pybind11_extensions::make_iterator(
LevelDBItemsIterator(std::move(iterator_ptr))
);
LevelDBItemsIterator(std::move(iterator_ptr)));
}
},
py::arg("start") = py::none(),
Expand All @@ -529,8 +574,7 @@ void init_leveldb(py::module m)
auto& iterator = *iterator_ptr;
iterator->SeekToFirst();
return pybind11_extensions::make_iterator(
LevelDBKeysIterator(std::move(iterator_ptr))
);
LevelDBKeysIterator(std::move(iterator_ptr)));
});
LevelDB.def(
"keys",
Expand All @@ -539,8 +583,7 @@ void init_leveldb(py::module m)
auto& iterator = *iterator_ptr;
iterator->SeekToFirst();
return pybind11_extensions::make_iterator(
LevelDBKeysIterator(std::move(iterator_ptr))
);
LevelDBKeysIterator(std::move(iterator_ptr)));
},
py::doc("An iterable of all keys in the database."));

Expand All @@ -551,8 +594,7 @@ void init_leveldb(py::module m)
auto& iterator = *iterator_ptr;
iterator->SeekToFirst();
return pybind11_extensions::make_iterator(
LevelDBValuesIterator(std::move(iterator_ptr))
);
LevelDBValuesIterator(std::move(iterator_ptr)));
},
py::doc("An iterable of all values in the database."));

Expand All @@ -563,8 +605,7 @@ void init_leveldb(py::module m)
auto& iterator = *iterator_ptr;
iterator->SeekToFirst();
return pybind11_extensions::make_iterator(
LevelDBItemsIterator(std::move(iterator_ptr))
);
LevelDBItemsIterator(std::move(iterator_ptr)));
},
py::doc("An iterable of all items in the database."));
}
Expand Down
Loading

0 comments on commit ddae0b6

Please sign in to comment.