Skip to content

Commit

Permalink
Buffered reads/writes for hint generation with shared memory (#1127)
Browse files Browse the repository at this point in the history
This PR adds implementation for buffered reads/writes from and to the
shared memory ringbuffer during hint generation. The previous
implementation would synchronize reads and writes at the granularity of
one byte which is not ideal for performance. In this PR, we add support
for synchronization across reads and writes of larger sizes. Both the
ringbuffer-based proof trace reader and writer classes are updated to
buffer incoming reads and writes: They collect data into an internal
buffer and only interacting with the shared memory ringbuffer when that
buffer is full in case of the writer or empty in case of the reader.
  • Loading branch information
theo25 authored Aug 6, 2024
1 parent c79bfd3 commit f7f3f29
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 66 deletions.
177 changes: 141 additions & 36 deletions include/kllvm/binary/deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,68 +210,172 @@ class proof_trace_ringbuffer : public proof_trace_buffer {
shm_ringbuffer *shm_buffer_;
sem_t *data_avail_;
sem_t *space_avail_;

std::deque<uint8_t> peek_data_;
std::array<uint8_t, shm_ringbuffer::buffered_access_sz> buffer_;
size_t buffer_data_size_{0};
size_t buffer_data_start_{0};

bool peek_eof_{false};
bool buffered_eof_{false};
bool read_eof_{false};

bool read(uint8_t *ptr, size_t len = 1) {
for (size_t i = 0; i < len; i++) {
if (!peek_data_.empty()) {
ptr[i] = peek_data_.front();
peek_data_.pop_front();
continue;
}
// Helper function that reads from the shared memory ringbuffer into the
// buffer_. It tries to read shm_ringbuffer::buffered_access_sz bytes of data.
// It assumes that buffer_ is empty.
void read_from_shared_memory() {
assert(buffer_data_size_ == 0);
assert(buffer_data_start_ == 0);

if (peek_eof_) {
read_eof_ = true;
return false;
}
if (buffered_eof_) {
return;
}

if (read_eof_) {
return false;
sem_wait(data_avail_);

if (shm_buffer_->eof()) {
// EOF has been written to the ringbuffer. Check if this is the last chunk
// of data to be read.
size_t remaining_data_sz = shm_buffer_->data_size();
if (remaining_data_sz < shm_ringbuffer::buffered_access_sz) {
// This is the last chunk of data to be read from the ringbuffer.
shm_buffer_->get(buffer_.data(), remaining_data_sz);
sem_post(space_avail_);

buffer_data_size_ = remaining_data_sz;
buffered_eof_ = true;

return;
}
}

// Else, either EOF has not been written or EOF has been written but there
// are remaining full chunks to be read. In any case, we can read a full
// chunk.
shm_buffer_->get(buffer_.data());
sem_post(space_avail_);

buffer_data_size_ = shm_ringbuffer::buffered_access_sz;
}

bool read(uint8_t *ptr, size_t len = 1) {
// Check if we have read EOF already.
if (read_eof_) {
return false;
}

// Consume peeked data.
while (len > 0 && !peek_data_.empty()) {
*ptr = peek_data_.front();
peek_data_.pop_front();
ptr++;
len--;
}

while (int wait_status = sem_trywait(data_avail_)) {
assert(wait_status == -1 && errno == EAGAIN);
if (shm_buffer_->eof()) {
// Consume peeked EOF.
if (len > 0 && peek_eof_) {
read_eof_ = true;
return false;
}

// Peeked data has been fully consumed. If more data is requested, we need
// to read from buffer_ and/or shared memory.
while (len > 0) {
// If buffer_ is empty, try to fetch more data from the shared memory
// ringbuffer.
if (buffer_data_size_ == 0) {
// Check for and conusme buffered EOF.
if (buffered_eof_) {
read_eof_ = true;
return false;
}

assert(buffer_data_start_ == 0);
read_from_shared_memory();
}

// Read available data from the buffer_.
assert(buffer_data_start_ < shm_ringbuffer::buffered_access_sz);
assert(
buffer_data_start_ + buffer_data_size_
<= shm_ringbuffer::buffered_access_sz);
size_t size_to_read_from_buffer = std::min(len, buffer_data_size_);
memcpy(
ptr, buffer_.data() + buffer_data_start_, size_to_read_from_buffer);
ptr += size_to_read_from_buffer;
len -= size_to_read_from_buffer;

buffer_data_start_ += size_to_read_from_buffer;
buffer_data_size_ -= size_to_read_from_buffer;
if (buffer_data_start_ == shm_ringbuffer::buffered_access_sz) {
assert(buffer_data_size_ == 0);
buffer_data_start_ = 0;
}
shm_buffer_->get(&ptr[i]);
sem_post(space_avail_);
}

assert(len == 0);
return true;
}

bool peek(uint8_t *ptr, size_t len = 1) {
for (size_t i = 0; i < len; i++) {
if (i < peek_data_.size()) {
ptr[i] = peek_data_[i];
continue;
}
// Check if we have read EOF already.
if (read_eof_) {
return false;
}

if (peek_eof_) {
return false;
}
// Copy already peeked data.
size_t i = 0;
while (len > 0 && i < peek_data_.size()) {
*ptr = peek_data_[i];
ptr++;
i++;
len--;
}

if (read_eof_) {
return false;
}
// Check for already peeked EOF.
if (len > 0 && peek_eof_) {
return false;
}

while (int wait_status = sem_trywait(data_avail_)) {
assert(wait_status == -1 && errno == EAGAIN);
if (shm_buffer_->eof()) {
// Already peeked data has been fully copied. If more data is requested, we
// need to peek from buffer_ and/or shared memory.
while (len > 0) {
// If buffer_ is empty, try to fetch more data from the shared memory
// ringbuffer.
if (buffer_data_size_ == 0) {
// Check for buffered EOF.
if (buffered_eof_) {
peek_eof_ = true;
return false;
}

assert(buffer_data_start_ == 0);
read_from_shared_memory();
}

// Peek available data from the buffer_.
assert(buffer_data_start_ < shm_ringbuffer::buffered_access_sz);
assert(
buffer_data_start_ + buffer_data_size_
<= shm_ringbuffer::buffered_access_sz);
size_t size_to_peek_from_buffer = std::min(len, buffer_data_size_);
memcpy(
ptr, buffer_.data() + buffer_data_start_, size_to_peek_from_buffer);
peek_data_.insert(
peek_data_.end(), buffer_.begin() + buffer_data_start_,
buffer_.begin() + buffer_data_start_ + size_to_peek_from_buffer);
ptr += size_to_peek_from_buffer;
len -= size_to_peek_from_buffer;

buffer_data_start_ += size_to_peek_from_buffer;
buffer_data_size_ -= size_to_peek_from_buffer;
if (buffer_data_start_ == shm_ringbuffer::buffered_access_sz) {
assert(buffer_data_size_ == 0);
buffer_data_start_ = 0;
}
shm_buffer_->get(&ptr[i]);
sem_post(space_avail_);
peek_data_.push_back(ptr[i]);
}

assert(len == 0);
return true;
}

Expand All @@ -280,7 +384,8 @@ class proof_trace_ringbuffer : public proof_trace_buffer {
void *shm_object, sem_t *data_avail, sem_t *space_avail)
: shm_buffer_(static_cast<shm_ringbuffer *>(shm_object))
, data_avail_(data_avail)
, space_avail_(space_avail) {
, space_avail_(space_avail)
, buffer_() {
new (shm_buffer_) shm_ringbuffer;
}

Expand Down
15 changes: 9 additions & 6 deletions include/kllvm/binary/ringbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ class shm_ringbuffer {
// empty, we maintain the invariant that the capacity of the buffer is one byte
// less than its size. This way, the buffer is empty iff read_pos == write_pos,
// and it is full iff read_pos == (write_pos+1)%size.
static constexpr size_t size = 1024;
static constexpr size_t size = 4096;

// Ringbuffer capacity in bytes.
// As commented above, the capacity is always equal to size-1.
static constexpr size_t capacity = size - 1;

// The default size in bytes for put and get operations.
static constexpr size_t buffered_access_sz = 64;
static_assert(buffered_access_sz <= capacity);

private:
bool eof_{false};
size_t read_pos_{0};
Expand All @@ -42,23 +46,22 @@ class shm_ringbuffer {
// undefined behavior.
void put_eof();

// Returns true when the ringbuffer is empty and the EOF has been written, and
// false otherwise. As commented above, the ringbuffer is empty iff
// read_pos == write_pos.
// Returns true when eof has been written in the ringbuffer. At that point,
// the ringbuffer may still contain data, but no further writes can happen.
[[nodiscard]] bool eof() const;

// Add data to the ringbuffer. The behavior is undefined if the buffer does not
// have enough remaining space to fit the data or if EOF has been written to the
// ringbuffer. The behavior is also undefined if the data pointer passed to this
// function does not point to a contiguous memory chunk of the corresponding
// size.
void put(uint8_t const *data, size_t count = 1);
void put(uint8_t const *data, size_t count = buffered_access_sz);

// Get and remove data from the ringbuffer. The behavior is undefined if more
// data is requested than it is currently available in the ringbuffer. The
// behavior is also undefined if the data pointer passed to this function does
// not point to a contiguous memory chunk of the corresponding size.
void get(uint8_t *data, size_t count = 1);
void get(uint8_t *data, size_t count = buffered_access_sz);
};

} // namespace kllvm
Expand Down
6 changes: 5 additions & 1 deletion include/kllvm/binary/serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,18 @@ class proof_trace_ringbuffer_writer : public proof_trace_writer {
sem_t *data_avail_;
sem_t *space_avail_;

std::array<uint8_t, shm_ringbuffer::buffered_access_sz> buffer_;
size_t buffer_data_size_{0};

void write(uint8_t const *ptr, size_t len = 1);

public:
proof_trace_ringbuffer_writer(
void *shm_object, sem_t *data_avail, sem_t *space_avail)
: shm_buffer_(reinterpret_cast<shm_ringbuffer *>(shm_object))
, data_avail_(data_avail)
, space_avail_(space_avail) { }
, space_avail_(space_avail)
, buffer_() { }

~proof_trace_ringbuffer_writer() override { shm_buffer_->~shm_ringbuffer(); }

Expand Down
9 changes: 1 addition & 8 deletions lib/binary/ringbuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,7 @@ void shm_ringbuffer::put_eof() {
}

bool shm_ringbuffer::eof() const {
// NOTE: for synchronization purposes, it is important that the eof_ field is
// checked first. Typically, the reader process will call this, so we want to
// avoid a race where the writer updates buf.writer_pos after the reader has
// accessed it but before the reader has fully evaluated the condition. If
// eof_ is checked first, and due to short-circuiting, we know that if eof_ is
// true, the writer will not do any further updates to the write_pos_ field,
// and if eof_ is false, the reader will not access write_pos_ at all.
return eof_ && write_pos_ == read_pos_;
return eof_;
}

void shm_ringbuffer::put(uint8_t const *data, size_t count) {
Expand Down
30 changes: 26 additions & 4 deletions lib/binary/serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,26 @@ void proof_trace_file_writer::write_string(char const *str) {
}

void proof_trace_ringbuffer_writer::write(uint8_t const *ptr, size_t len) {
for (size_t i = 0; i < len; i++) {
sem_wait(space_avail_);
shm_buffer_->put(&ptr[i]);
sem_post(data_avail_);
while (len > 0) {
// Write to the buffer.
assert(buffer_data_size_ < shm_ringbuffer::buffered_access_sz);
size_t size_to_write_to_buffer
= std::min(len, shm_ringbuffer::buffered_access_sz - buffer_data_size_);
memcpy(buffer_.data() + buffer_data_size_, ptr, size_to_write_to_buffer);

ptr += size_to_write_to_buffer;
buffer_data_size_ += size_to_write_to_buffer;
len -= size_to_write_to_buffer;

// If the buffer is full, write its data to the shared memory ringbuffer and
// empty it.
if (buffer_data_size_ == shm_ringbuffer::buffered_access_sz) {
sem_wait(space_avail_);
shm_buffer_->put(buffer_.data());
sem_post(data_avail_);

buffer_data_size_ = 0;
}
}
}

Expand All @@ -246,7 +262,13 @@ void proof_trace_ringbuffer_writer::write_string(char const *str) {
}

void proof_trace_ringbuffer_writer::write_eof() {
sem_wait(space_avail_);
// Write any remaining buffer contents to the ringbuffer before writing EOF.
shm_buffer_->put(buffer_.data(), buffer_data_size_);
shm_buffer_->put_eof();
sem_post(data_avail_);

buffer_data_size_ = 0;
}

} // namespace kllvm
21 changes: 13 additions & 8 deletions tools/kore-proof-trace/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ cl::opt<bool> use_shared_memory(
exit(1); \
} while (0)

static constexpr mode_t perms
= S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;

// NOLINTNEXTLINE(*-cognitive-complexity)
int main(int argc, char **argv) {
cl::HideUnrelatedOptions({&kore_proof_trace_cat});
Expand Down Expand Up @@ -86,8 +89,7 @@ int main(int argc, char **argv) {
shm_unlink(input_filename.c_str());

// Open shared memory object
int fd = shm_open(
input_filename.c_str(), O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
int fd = shm_open(input_filename.c_str(), O_CREAT | O_EXCL | O_RDWR, perms);
if (fd == -1) {
ERR_EXIT("shm_open reader");
}
Expand All @@ -114,16 +116,16 @@ int main(int argc, char **argv) {
sem_unlink(space_avail_sem_name.c_str());

// Initialize semaphores
// NOLINTNEXTLINE(*-pro-type-vararg)
sem_t *data_avail = sem_open(
data_avail_sem_name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, 0);
sem_t *data_avail
// NOLINTNEXTLINE(*-pro-type-vararg)
= sem_open(data_avail_sem_name.c_str(), O_CREAT | O_EXCL, perms, 0);
if (data_avail == SEM_FAILED) {
ERR_EXIT("sem_init data_avail reader");
}
// NOLINTNEXTLINE(*-pro-type-vararg)
sem_t *space_avail = sem_open(
space_avail_sem_name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
shm_ringbuffer::capacity);
space_avail_sem_name.c_str(), O_CREAT | O_EXCL, perms,
shm_ringbuffer::capacity / shm_ringbuffer::buffered_access_sz);
if (space_avail == SEM_FAILED) {
ERR_EXIT("sem_init space_avail reader");
}
Expand All @@ -132,7 +134,10 @@ int main(int argc, char **argv) {
auto trace = parser.parse_proof_trace_from_shmem(
shm_object, data_avail, space_avail);

// Close semaphores
// Close the shared memory object and semaphores
if (close(fd) == -1) {
ERR_EXIT("close shm object reader");
}
if (sem_close(data_avail) == -1) {
ERR_EXIT("sem_close data reader");
}
Expand Down
Loading

0 comments on commit f7f3f29

Please sign in to comment.