Skip to content

Commit

Permalink
feat(clp-s): Unescape string values during ingestion and fix support …
Browse files Browse the repository at this point in the history
…for search using escape sequences. (#622)

Co-authored-by: wraymo <[email protected]>
Co-authored-by: kirkrodrigues <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent 5d3b671 commit 252a789
Show file tree
Hide file tree
Showing 17 changed files with 582 additions and 154 deletions.
2 changes: 0 additions & 2 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ set(
../clp/ffi/KeyValuePairLogEvent.hpp
../clp/ffi/SchemaTree.cpp
../clp/ffi/SchemaTree.hpp
../clp/ffi/utils.cpp
../clp/ffi/utils.hpp
../clp/ffi/Value.hpp
../clp/FileDescriptor.cpp
../clp/FileDescriptor.hpp
Expand Down
22 changes: 22 additions & 0 deletions components/core/src/clp_s/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "BufferViewReader.hpp"
#include "ColumnWriter.hpp"
#include "Utils.hpp"
#include "VariableDecoder.hpp"

namespace clp_s {
Expand Down Expand Up @@ -88,6 +89,20 @@ void ClpStringColumnReader::extract_string_value_into_buffer(
VariableDecoder::decode_variables_into_message(entry, *m_var_dict, encoded_vars, buffer);
}

void ClpStringColumnReader::extract_escaped_string_value_into_buffer(
uint64_t cur_message,
std::string& buffer
) {
if (false == m_is_array) {
// TODO: escape while decoding instead of after.
std::string tmp;
extract_string_value_into_buffer(cur_message, tmp);
StringUtils::escape_json_string(buffer, tmp);
} else {
extract_string_value_into_buffer(cur_message, buffer);
}
}

int64_t ClpStringColumnReader::get_encoded_id(uint64_t cur_message) {
auto value = m_logtypes[cur_message];
return ClpStringColumnWriter::get_encoded_log_dict_id(value);
Expand Down Expand Up @@ -125,6 +140,13 @@ void VariableStringColumnReader::extract_string_value_into_buffer(
buffer.append(m_var_dict->get_value(m_variables[cur_message]));
}

void VariableStringColumnReader::extract_escaped_string_value_into_buffer(
uint64_t cur_message,
std::string& buffer
) {
StringUtils::escape_json_string(buffer, m_var_dict->get_value(m_variables[cur_message]));
}

int64_t VariableStringColumnReader::get_variable_id(uint64_t cur_message) {
return m_variables[cur_message];
}
Expand Down
17 changes: 17 additions & 0 deletions components/core/src/clp_s/ColumnReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ class BaseColumnReader {
*/
virtual void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) = 0;

/**
* Extracts a value from the column, escapes it, and serializes it into a provided buffer as a
* string.
* @param cur_message
* @param buffer
*/
virtual void
extract_escaped_string_value_into_buffer(uint64_t cur_message, std::string& buffer) {
extract_string_value_into_buffer(cur_message, buffer);
}

private:
int32_t m_id;
};
Expand Down Expand Up @@ -152,6 +163,9 @@ class ClpStringColumnReader : public BaseColumnReader {

void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

void
extract_escaped_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

/**
* Gets the encoded id of the variable
* @param cur_message
Expand Down Expand Up @@ -196,6 +210,9 @@ class VariableStringColumnReader : public BaseColumnReader {

void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

void
extract_escaped_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

/**
* Gets the encoded id of the variable
* @param cur_message
Expand Down
66 changes: 15 additions & 51 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "../clp/ffi/ir_stream/IrUnitType.hpp"
#include "../clp/ffi/KeyValuePairLogEvent.hpp"
#include "../clp/ffi/SchemaTree.hpp"
#include "../clp/ffi/utils.hpp"
#include "../clp/ffi/Value.hpp"
#include "../clp/ir/EncodedTextAst.hpp"
#include "../clp/NetworkReader.hpp"
Expand Down Expand Up @@ -125,7 +124,7 @@ void JsonParser::parse_obj_in_array(ondemand::object line, int32_t parent_node_i
size_t object_start = m_current_schema.start_unordered_object(NodeType::Object);
ondemand::field cur_field;
ondemand::value cur_value;
std::string cur_key;
std::string_view cur_key;
int32_t node_id;
while (true) {
while (false == object_stack.empty() && object_it_stack.top() == object_stack.top().end()) {
Expand All @@ -143,7 +142,7 @@ void JsonParser::parse_obj_in_array(ondemand::object line, int32_t parent_node_i
}

cur_field = *object_it_stack.top();
cur_key = std::string_view(cur_field.unescaped_key(true));
cur_key = cur_field.unescaped_key(true);
cur_value = cur_field.value();

switch (cur_value.type()) {
Expand Down Expand Up @@ -193,9 +192,7 @@ void JsonParser::parse_obj_in_array(ondemand::object line, int32_t parent_node_i
break;
}
case ondemand::json_type::string: {
std::string value = std::string(
cur_value.raw_json_token().substr(1, cur_value.raw_json_token().size() - 2)
);
std::string_view value = cur_value.get_string(true);
if (value.find(' ') != std::string::npos) {
node_id = m_archive_writer
->add_node(node_id_stack.top(), NodeType::ClpString, cur_key);
Expand Down Expand Up @@ -271,9 +268,7 @@ void JsonParser::parse_array(ondemand::array array, int32_t parent_node_id) {
break;
}
case ondemand::json_type::string: {
std::string value = std::string(
cur_value.raw_json_token().substr(1, cur_value.raw_json_token().size() - 2)
);
std::string_view value = cur_value.get_string(true);
if (value.find(' ') != std::string::npos) {
node_id = m_archive_writer->add_node(parent_node_id, NodeType::ClpString, "");
} else {
Expand Down Expand Up @@ -308,7 +303,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s

ondemand::field cur_field;

std::string cur_key = key;
std::string_view cur_key = key;
node_id_stack.push(parent_node_id);

bool can_match_timestamp = !m_timestamp_column.empty();
Expand All @@ -319,7 +314,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s
do {
if (false == object_stack.empty()) {
cur_field = *object_it_stack.top();
cur_key = std::string(std::string_view(cur_field.unescaped_key(true)));
cur_key = cur_field.unescaped_key(true);
line = cur_field.value();
if (may_match_timestamp) {
if (object_stack.size() <= m_timestamp_column.size()
Expand Down Expand Up @@ -415,10 +410,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s
break;
}
case ondemand::json_type::string: {
auto raw_json_token = line.raw_json_token();
std::string value
= std::string(raw_json_token.substr(1, raw_json_token.rfind('"') - 1));

std::string_view value = line.get_string(true);
if (matches_timestamp) {
node_id = m_archive_writer->add_node(
node_id_stack.top(),
Expand Down Expand Up @@ -679,18 +671,11 @@ auto JsonParser::add_node_to_archive_and_translations(
NodeType archive_node_type,
int32_t parent_node_id
) -> int {
auto validated_escaped_key
= clp::ffi::validate_and_escape_utf8_string(ir_node_to_add.get_key_name());
std::string node_key;
if (validated_escaped_key.has_value()) {
node_key = validated_escaped_key.value();
} else {
SPDLOG_ERROR("Key is not UTF-8 compliant: \"{}\"", ir_node_to_add.get_key_name());
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
int const curr_node_archive_id
= m_archive_writer->add_node(parent_node_id, archive_node_type, node_key);

int const curr_node_archive_id = m_archive_writer->add_node(
parent_node_id,
archive_node_type,
ir_node_to_add.get_key_name()
);
m_ir_node_to_archive_node_id_mapping.emplace(
std::make_pair(ir_node_id, archive_node_type),
curr_node_archive_id
Expand Down Expand Up @@ -784,23 +769,10 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
m_current_parsed_message.add_value(node_id, b_value);
} break;
case NodeType::VarString: {
auto const validated_escaped_string = clp::ffi::validate_and_escape_utf8_string(
pair.second.value().get_immutable_view<std::string>()
);
std::string str;
if (validated_escaped_string.has_value()) {
str = validated_escaped_string.value();
} else {
SPDLOG_ERROR(
"String is not utf8 compliant: \"{}\"",
pair.second.value().get_immutable_view<std::string>()
);
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
m_current_parsed_message.add_value(node_id, str);
auto const var_value{pair.second.value().get_immutable_view<std::string>()};
m_current_parsed_message.add_value(node_id, var_value);
} break;
case NodeType::ClpString: {
std::string encoded_str;
std::string decoded_value;
if (pair.second.value().is<clp::ir::EightByteEncodedTextAst>()) {
decoded_value = pair.second.value()
Expand All @@ -814,15 +786,7 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
.decode_and_unparse()
.value();
}
auto const validated_escaped_encoded_string
= clp::ffi::validate_and_escape_utf8_string(decoded_value.c_str());
if (validated_escaped_encoded_string.has_value()) {
encoded_str = validated_escaped_encoded_string.value();
} else {
SPDLOG_ERROR("Encoded string is not utf8 compliant: \"{}\"", decoded_value);
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
m_current_parsed_message.add_value(node_id, encoded_str);
m_current_parsed_message.add_value(node_id, decoded_value);
} break;
case NodeType::UnstructuredArray: {
std::string array_str;
Expand Down
23 changes: 19 additions & 4 deletions components/core/src/clp_s/JsonSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <vector>

#include "ColumnReader.hpp"
#include "Utils.hpp"

namespace clp_s {

class JsonSerializer {
public:
Expand Down Expand Up @@ -67,7 +70,11 @@ class JsonSerializer {
return false;
}

void add_special_key(std::string_view const key) { m_special_keys.emplace_back(key); }
void add_special_key(std::string_view const key) {
std::string tmp;
StringUtils::escape_json_string(tmp, key);
m_special_keys.emplace_back(tmp);
}

void begin_object() {
append_key();
Expand Down Expand Up @@ -109,11 +116,11 @@ class JsonSerializer {
m_json_string += "],";
}

void append_key() { append_key(m_special_keys[m_special_keys_index++]); }
void append_key() { append_escaped_key(m_special_keys[m_special_keys_index++]); }

void append_key(std::string_view const key) {
m_json_string += "\"";
m_json_string += key;
StringUtils::escape_json_string(m_json_string, key);
m_json_string += "\":";
}

Expand All @@ -130,11 +137,17 @@ class JsonSerializer {
void
append_value_from_column_with_quotes(clp_s::BaseColumnReader* column, uint64_t cur_message) {
m_json_string += "\"";
column->extract_string_value_into_buffer(cur_message, m_json_string);
column->extract_escaped_string_value_into_buffer(cur_message, m_json_string);
m_json_string += "\",";
}

private:
void append_escaped_key(std::string_view const key) {
m_json_string.push_back('"');
m_json_string.append(key);
m_json_string.append("\":");
}

std::string m_json_string;
std::vector<Op> m_op_list;
std::vector<std::string> m_special_keys;
Expand All @@ -143,4 +156,6 @@ class JsonSerializer {
size_t m_special_keys_index{0};
};

} // namespace clp_s

#endif // CLP_S_JSONSERIALIZER_HPP
Loading

0 comments on commit 252a789

Please sign in to comment.