Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support cleaning up spare examples correctly in read_span_flatbuffer() #4684

Merged
merged 12 commits into from
Feb 15, 2024
Merged
3 changes: 3 additions & 0 deletions vowpalwabbit/core/include/vw/core/error_data.h
Original file line number Diff line number Diff line change
@@ -25,6 +25,9 @@ ERROR_CODE_DEFINITION(
ERROR_CODE_DEFINITION(
13, fb_parser_size_mismatch_ft_names_ft_values, "Size of feature names and feature values do not match. ")
ERROR_CODE_DEFINITION(14, unknown_label_type, "Label type in Flatbuffer not understood. ")
ERROR_CODE_DEFINITION(15, fb_parser_span_misaligned, "Input Flatbuffer span is not aligned to an 8-byte boundary. ")
ERROR_CODE_DEFINITION(
16, fb_parser_span_length_mismatch, "Input Flatbuffer span does not match flatbuffer size prefix. ")

// TODO: This is temporary until we switch to the new error handling mechanism.
ERROR_CODE_DEFINITION(10000, vw_exception, "vw_exception: ")
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@

#pragma once

#include "vw/core/api_status.h"
#include "vw/core/example.h"
#include "vw/core/multi_ex.h"
#include "vw/core/shared_data.h"
@@ -14,15 +13,21 @@
namespace VW
{

namespace experimental
{
class api_status;
}

using example_sink_f = std::function<void(VW::multi_ex&& spare_examples)>;

namespace parsers
{
namespace flatbuffer
{
int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& examples);
bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples);

int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink = nullptr, VW::experimental::api_status* status = nullptr);

class parser
{
@@ -57,6 +62,19 @@ class parser
VW::experimental::api_status* status = nullptr);
int get_namespace_index(const Namespace* ns, namespace_index& ni, VW::experimental::api_status* status = nullptr);

inline void reset_active_multi_ex()
{
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
}

inline void reset_active_collection()
{
_example_index = 0;
_active_collection = false;
}

void parse_simple_label(shared_data* sd, polylabel* l, reduction_features* red_features, const SimpleLabel* label);
void parse_cb_label(polylabel* l, const CBLabel* label);
void parse_ccb_label(polylabel* l, const CCBLabel* label);
130 changes: 80 additions & 50 deletions vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc
Original file line number Diff line number Diff line change
@@ -5,12 +5,14 @@
#include "vw/fb_parser/parse_example_flatbuffer.h"

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
#include "vw/core/error_constants.h"
#include "vw/core/global_data.h"
#include "vw/core/parser.h"
#include "vw/core/scope_exit.h"
#include "vw/core/vw.h"

#include <cfloat>
@@ -43,8 +45,8 @@ int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
return static_cast<int>(status.get_error_code() == VW::experimental::error_code::success);
}

bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples)
int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink, VW::experimental::api_status* status)
{
// we expect context to contain a size_prefixed flatbuffer (technically a binary string)
// which means:
@@ -59,16 +61,15 @@ bool read_span_flatbuffer(
// thus context.size() = sizeof(length) + length
io_buf unused;

// TODO: How do we report errors out of here? (This is a general API problem with the parsers)
size_t address = reinterpret_cast<size_t>(span);
if (address % 8 != 0)
{
std::stringstream sstream;
sstream << "fb_parser error: flatbuffer data not aligned to 8 bytes" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " % " << 8 << " = " << address % 8
<< " (vs desired = " << 0 << ")";
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_misaligned) << sstream.str();
}

flatbuffers::uoffset_t flatbuffer_object_size =
@@ -79,42 +80,80 @@ bool read_span_flatbuffer(
sstream << "fb_parser error: flatbuffer size prefix does not match actual size" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " size_prefix = " << flatbuffer_object_size
<< " length = " << length;
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_length_mismatch) << sstream.str();
}

VW::multi_ex temp_ex;
temp_ex.push_back(&example_factory());

// Use scope_exit because the parser reports errors by throwing exceptions (the code path in the vw driver
// uses the return value to signal completion, not errors).
auto scope_guard = VW::scope_exit(
[&temp_ex, &all, &example_sink]()
{
if (example_sink == nullptr) { VW::finish_example(*all, temp_ex); }
else { example_sink(std::move(temp_ex)); }
});

// There is a bit of unhappiness with the interface of the read_XYZ_<format>() functions, because they often
// expect the input multi_ex to have a single "empty" example there. This contributes, in part, to the large
// proliferation of entry points into the JSON parser(s). We want to avoid exposing that insofar as possible,
// so we will check whether we already received a perfectly good example and use that, or create a new one if
// needed.
if (examples.size() > 0)
{
assert(examples.size() == 1);
temp_ex.push_back(examples[0]);
examples.pop_back();
}
else { temp_ex.push_back(&example_factory()); }

bool has_more = true;
VW::experimental::api_status status;
do {
switch (all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, &status))
switch (int result = all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, status))
{
case VW::experimental::error_code::success:
has_more = true;
break;
// Because nothing_to_parse is not an error we have to filter it out here, otherwise
// we could simply do RETURN_IF_FAIL(result) and let the macro handle it.
case VW::experimental::error_code::nothing_to_parse:
has_more = false;
break;
default:
std::stringstream sstream;
sstream << "Error parsing examples: " << std::endl;
THROW(sstream.str());
return false;
RETURN_IF_FAIL(result);
}

// The underlying parser will emit a newline example when terminating the parsing
// of a multi_ex block. Since we are collecting it into a multi_ex, we want to
// swallow it here, but should the parser not have followed its contract w.r.t.
// the return value, we should use the presence of the newline example to override
// has_more.
has_more &= !temp_ex[0]->is_newline;

// If this is a real example, we need to move it to the output multi_ex; we also
// need to create a new example to replace it for the next run through the parser.
if (!temp_ex[0]->is_newline)
{
examples.push_back(&example_factory());
std::swap(examples[examples.size() - 1], temp_ex[0]);
// We avoid doing moves or copy construction here because multi_ex contains
// example pointers. The compile-time code here is meant to call attention
// to here if the underlying type changes.
using temp_ex_element_t = std::remove_reference<decltype(temp_ex[0])>::type;
using examples_element_t = std::remove_reference<decltype(examples[0])>::type;

static_assert(std::is_same<temp_ex_element_t, examples_element_t>::value &&
std::is_same<temp_ex_element_t, VW::example*>::value,
"temp_ex and example must be vector-like over VW::example*");

examples.push_back(temp_ex[0]);

// Since we are using a vector of pointers, we can simply reassign the slot to
// the pointer of the newly created destination example for the parser.
temp_ex[0] = &example_factory();
}
} while (has_more);

VW::finish_example(*all, temp_ex);
return true;
return VW::experimental::error_code::success;
}

const VW::parsers::flatbuffer::ExampleRoot* parser::data() { return _data; }
@@ -198,16 +237,17 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
{
_active_multi_ex = true;
_multi_example_object = _data->example_obj_as_ExampleCollection()->multi_examples()->Get(_example_index);

// read from active multi_ex
RETURN_IF_FAIL(parse_multi_example(all, examples[0], _multi_example_object, status));
// read from active collection

// if we are done with the multi example, move to the next one, or finish the collection
if (!_active_multi_ex)
{
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->multi_examples()->size())
{
_example_index = 0;
_active_collection = false;
reset_active_collection();
}
}
}
@@ -216,11 +256,7 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
const auto ex = _data->example_obj_as_ExampleCollection()->examples()->Get(_example_index);
RETURN_IF_FAIL(parse_example(all, examples[0], ex, status));
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size())
{
_example_index = 0;
_active_collection = false;
}
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size()) { reset_active_collection(); }
}
return VW::experimental::error_code::success;
}
@@ -231,6 +267,20 @@ int parser::parse_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
#define RETURN_SUCCESS_FINISHED() \
return buffer_pointer ? VW::experimental::error_code::nothing_to_parse : VW::experimental::error_code::success;

// If we are re-using a single parser instance across multiple invocations, we need to reset
// the state when we get a new buffer_pointer. Otherwise we may be in the middle of a multi_ex
// or example_collection, and the following parse will attempt to reuse the object references
// from the previous buffer, which may have been deallocated.
// TODO: Rewrite the parser to avoid this convoluted, re-entrant logic.
if (buffer_pointer && _flatbuffer_pointer != buffer_pointer)
{
reset_active_multi_ex();
reset_active_collection();
}

// The ExampleCollection processing code owns dispatching to parse_multi_example to handle
// iteration through the outer collection correctly, thus it must have the first chance to
// incoming parse request.
if (_active_collection)
{
RETURN_IF_FAIL(process_collection_item(all, examples, status));
@@ -307,9 +357,7 @@ int parser::parse_multi_example(
{
// done with multi example, send a newline example and reset
ae->is_newline = true;
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
reset_active_multi_ex();
return VW::experimental::error_code::success;
}

@@ -325,30 +373,11 @@ int parser::get_namespace_index(const Namespace* ns, namespace_index& ni, VW::ex
ni = static_cast<uint8_t>(ns->name()->c_str()[0]);
return VW::experimental::error_code::success;
}
else if (flatbuffers::IsFieldPresent(ns, Namespace::VT_HASH))
else
{
ni = ns->hash();
return VW::experimental::error_code::success;
}

if (_active_collection && _active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in collection item with example "
"index "
<< _example_index << "and multi example index " << _multi_ex_index;
}
else if (_active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in multi example index "
<< _multi_ex_index;
}
else
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index";
}
}

bool get_namespace_hash(VW::workspace* all, const Namespace* ns, uint64_t& hash)
@@ -462,7 +491,7 @@ int parser::parse_namespaces(VW::workspace* all, example* ae, const Namespace* n
}
else
{
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_name_hash_missing) }
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_feature_hashes_names_missing) }

if (ns->feature_hashes()->size() != ns->feature_values()->size())
{
@@ -541,6 +570,7 @@ int parser::parse_flat_label(
break;
}
case Label_NONE:
case Label_no_label:
break;
default:
if (_active_collection && _active_multi_ex)
1 change: 1 addition & 0 deletions vowpalwabbit/fb_parser/src/parse_label.cc
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
// license as described in the file LICENSE.

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
Loading

Unchanged files with check annotations Beta

else { shrink_factors.resize(preds.size(), 1.f); }
}
template class cb_explore_adf_large_action_space<one_pass_svd_impl, one_rank_spanner_state>;

Check warning on line 266 in vowpalwabbit/core/src/reductions/cb/cb_explore_adf_large_action_space.cc

GitHub Actions / windows-latest-vcpkg-debug-Vcpkg build

'void VW::cb_explore_adf::cb_explore_adf_large_action_space<VW::cb_explore_adf::one_pass_svd_impl,VW::cb_explore_adf::one_rank_spanner_state>::save_load(VW::io_buf &,bool,bool)': no suitable definition provided for explicit template instantiation request
template class cb_explore_adf_large_action_space<two_pass_svd_impl, one_rank_spanner_state>;
} // namespace cb_explore_adf
} // namespace VW
VW::workspace* all = nullptr;
try
{
all = VW::initialize(std::move(options));

Check warning on line 32 in utl/flatbuffer/txt_to_flat.cc

GitHub Actions / asan.ubuntu-latest.vcpkg-asan-debug

‘VW::workspace* VW::initialize(std::unique_ptr<VW::config::options_i, void (*)(VW::config::options_i*)>, VW::io_buf*, bool, VW::trace_message_t, void*)’ is deprecated: Replaced with new unique_ptr based overload. [-Wdeprecated-declarations]

Check warning on line 32 in utl/flatbuffer/txt_to_flat.cc

GitHub Actions / asan.macos-latest.vcpkg-ubsan-debug

'initialize' is deprecated: Replaced with new unique_ptr based overload. [-Wdeprecated-declarations]

Check warning on line 32 in utl/flatbuffer/txt_to_flat.cc

GitHub Actions / asan.macos-latest.vcpkg-asan-debug

'initialize' is deprecated: Replaced with new unique_ptr based overload. [-Wdeprecated-declarations]
}
catch (const std::exception& ex)
{
// Create namespace when audit is false
flatbuffers::Offset<VW::parsers::flatbuffer::Namespace> to_flat::create_namespace(
features::const_iterator begin, features::const_iterator end, VW::namespace_index index, uint64_t hash)

Check warning on line 345 in utl/flatbuffer/vw_to_flat.cc

GitHub Actions / asan.macos-latest.vcpkg-ubsan-debug

'features' is deprecated: Moved into VW namespace. Will be removed in VW 10. [-Wdeprecated-declarations]

Check warning on line 345 in utl/flatbuffer/vw_to_flat.cc

GitHub Actions / asan.macos-latest.vcpkg-ubsan-debug

'features' is deprecated: Moved into VW namespace. Will be removed in VW 10. [-Wdeprecated-declarations]

Check warning on line 345 in utl/flatbuffer/vw_to_flat.cc

GitHub Actions / asan.macos-latest.vcpkg-asan-debug

'features' is deprecated: Moved into VW namespace. Will be removed in VW 10. [-Wdeprecated-declarations]

Check warning on line 345 in utl/flatbuffer/vw_to_flat.cc

GitHub Actions / asan.macos-latest.vcpkg-asan-debug

'features' is deprecated: Moved into VW namespace. Will be removed in VW 10. [-Wdeprecated-declarations]
{
std::stringstream ss;
ss << index;