Skip to content

Commit

Permalink
fix(interactive): Fix primary key check when fetching properties in r…
Browse files Browse the repository at this point in the history
…untime (#4459)

In interactive, we store the primary key in a different way from common
properties. To distinguish the primary key from other properties, we
need to look up the schema at runtime.

Fix #4460

---------

Co-authored-by: liulx20 <[email protected]>
  • Loading branch information
zhanglei1949 and liulx20 authored Feb 6, 2025
1 parent 6f79f20 commit 383a1e0
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 132 deletions.
76 changes: 16 additions & 60 deletions flex/engines/graph_db/runtime/common/accessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class IAccessor {
virtual std::string name() const { return "unknown"; }

virtual std::shared_ptr<IContextColumnBuilder> builder() const {
// LOG(FATAL) << "not implemented for " << this->name();
return nullptr;
}
};
Expand Down Expand Up @@ -94,42 +93,6 @@ class VertexPathAccessor : public IAccessor {
const IVertexColumn& vertex_col_;
};

template <typename KEY_T>
class VertexIdPathAccessor : public IAccessor {
public:
using elem_t = KEY_T;
VertexIdPathAccessor(const GraphReadInterface& graph, const Context& ctx,
int tag)
: graph_(graph),
vertex_col_(*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(tag))) {}

bool is_optional() const override { return vertex_col_.is_optional(); }

elem_t typed_eval_path(size_t idx) const {
const auto& v = vertex_col_.get_vertex(idx);
return AnyConverter<KEY_T>::from_any(graph_.GetVertexId(v.label_, v.vid_));
}

RTAny eval_path(size_t idx) const override {
return RTAny(typed_eval_path(idx));
}

RTAny eval_path(size_t idx, int) const override {
if (!vertex_col_.has_value(idx)) {
return RTAny(RTAnyType::kNull);
}
return RTAny(typed_eval_path(idx));
}

std::shared_ptr<IContextColumnBuilder> builder() const override {
return vertex_col_.builder();
}

private:
const GraphReadInterface& graph_;
const IVertexColumn& vertex_col_;
};

class VertexGIdPathAccessor : public IAccessor {
public:
using elem_t = int64_t;
Expand Down Expand Up @@ -162,27 +125,24 @@ class VertexPropertyPathAccessor : public IAccessor {
VertexPropertyPathAccessor(const GraphReadInterface& graph,
const Context& ctx, int tag,
const std::string& prop_name)
: vertex_col_(*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(tag))) {
: is_optional_(false),
vertex_col_(*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(tag))) {
int label_num = graph.schema().vertex_label_num();
for (int i = 0; i < label_num; ++i) {
property_columns_.emplace_back(
graph.GetVertexColumn<elem_t>(static_cast<label_t>(i), prop_name));
}
}

bool is_optional() const override {
property_columns_.resize(label_num);
const auto& labels = vertex_col_.get_labels_set();
if (vertex_col_.is_optional()) {
return true;
is_optional_ = true;
}
auto label_set = vertex_col_.get_labels_set();
for (auto label : label_set) {
for (auto label : labels) {
property_columns_[label] = graph.GetVertexColumn<T>(label, prop_name);
if (property_columns_[label].is_null()) {
return true;
is_optional_ = true;
}
}
return false;
}

bool is_optional() const override { return is_optional_; }

elem_t typed_eval_path(size_t idx) const {
const auto& v = vertex_col_.get_vertex(idx);
auto& col = property_columns_[v.label_];
Expand Down Expand Up @@ -212,6 +172,7 @@ class VertexPropertyPathAccessor : public IAccessor {
}

private:
bool is_optional_;
const IVertexColumn& vertex_col_;
std::vector<GraphReadInterface::vertex_column_t<elem_t>> property_columns_;
};
Expand Down Expand Up @@ -283,14 +244,13 @@ class ContextValueAccessor : public IAccessor {
const IValueColumn<elem_t>& col_;
};

template <typename KEY_T>
class VertexIdVertexAccessor : public IAccessor {
public:
using elem_t = KEY_T;
VertexIdVertexAccessor(const GraphReadInterface& graph) : graph_(graph) {}
using elem_t = VertexRecord;
VertexIdVertexAccessor() {}

elem_t typed_eval_vertex(label_t label, vid_t v, size_t idx) const {
return AnyConverter<KEY_T>::from_any(graph_.GetVertexId(label, v));
return VertexRecord{label, v};
}

RTAny eval_path(size_t idx) const override {
Expand All @@ -299,18 +259,15 @@ class VertexIdVertexAccessor : public IAccessor {
}

RTAny eval_vertex(label_t label, vid_t v, size_t idx) const override {
return RTAny(Any(typed_eval_vertex(label, v, idx)));
return RTAny::from_vertex(typed_eval_vertex(label, v, idx));
}

RTAny eval_vertex(label_t label, vid_t v, size_t idx, int) const override {
if (v == std::numeric_limits<vid_t>::max()) {
return RTAny(RTAnyType::kNull);
}
return RTAny(typed_eval_vertex(label, v, idx));
return RTAny::from_vertex(typed_eval_vertex(label, v, idx));
}

private:
const GraphReadInterface& graph_;
};

class VertexGIdVertexAccessor : public IAccessor {
Expand Down Expand Up @@ -377,7 +334,6 @@ class VertexPropertyVertexAccessor : public IAccessor {
return true;
}
}

return false;
}

Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/runtime/execute/ops/retrieve/edge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ bl::result<ReadOpBuildResultT> EdgeExpandGetVOprBuilder::Build(
// Exact vertex predicate
label_t exact_pk_label;
std::string pk_name;
if (is_pk_exact_check(v_opr.params().predicate(), exact_pk_label,
if (is_pk_exact_check(schema, v_opr.params().predicate(), exact_pk_label,
pk_name)) {
return std::make_pair(std::make_unique<EdgeExpandVWithExactVertexOpr>(
eep, exact_pk_label, pk_name, v_opr.params()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ bl::result<ReadOpBuildResultT> OrderByOprBuilder::Build(
const auto key = keys[0].first;
const auto order = keys[0].second;

auto func = [key, order, upper](const Context& ctx)
auto func = [key, order, upper, &schema](const Context& ctx)
-> std::optional<std::function<std::optional<std::vector<size_t>>(
const GraphReadInterface& graph, const Context& ctx)>> {
if (key.has_tag() &&
Expand All @@ -123,7 +123,9 @@ bl::result<ReadOpBuildResultT> OrderByOprBuilder::Build(
std::string prop_name = key.property().key().name();
auto vertex_col = std::dynamic_pointer_cast<IVertexColumn>(col);
int label_num = vertex_col->get_labels_set().size();
if (prop_name == "id" && label_num == 1) {
if (label_num == 1 &&
prop_name == schema.get_vertex_primary_key_name(
*vertex_col->get_labels_set().begin())) {
return [=](const GraphReadInterface& graph,
const Context& ctx) -> std::optional<std::vector<size_t>> {
std::vector<size_t> indices;
Expand Down
10 changes: 6 additions & 4 deletions flex/engines/graph_db/runtime/execute/ops/retrieve/path.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ class SPWithoutPredOpr : public IReadOperator {

class ASPOpr : public IReadOperator {
public:
ASPOpr(const physical::PathExpand& opr,
ASPOpr(const gs::Schema& schema, const physical::PathExpand& opr,
const physical::PhysicalOpr_MetaData& meta,
const physical::GetV& get_v_opr, int v_alias) {
int start_tag = opr.start_tag().value();
Expand All @@ -543,7 +543,8 @@ class ASPOpr : public IReadOperator {
CHECK(aspp_.labels.size() == 1) << "only support one label triplet";
CHECK(aspp_.labels[0].src_label == aspp_.labels[0].dst_label)
<< "only support same src and dst label";
CHECK(is_pk_oid_exact_check(get_v_opr.params().predicate(), oid_getter_));
CHECK(is_pk_oid_exact_check(schema, aspp_.labels[0].src_label,
get_v_opr.params().predicate(), oid_getter_));
}

std::string get_operator_name() const override { return "ASPOpr"; }
Expand Down Expand Up @@ -657,7 +658,8 @@ bl::result<ReadOpBuildResultT> SPOprBuilder::Build(
}
std::function<Any(const std::map<std::string, std::string>&)> oid_getter;
if (vertex.has_params() && vertex.params().has_predicate() &&
is_pk_oid_exact_check(vertex.params().predicate(), oid_getter)) {
is_pk_oid_exact_check(schema, spp.labels[0].src_label,
vertex.params().predicate(), oid_getter)) {
return std::make_pair(std::make_unique<SSSDSPOpr>(spp, oid_getter),
ret_meta);
} else {
Expand Down Expand Up @@ -709,7 +711,7 @@ bl::result<ReadOpBuildResultT> SPOprBuilder::Build(
return std::make_pair(nullptr, ContextMeta());
}
return std::make_pair(std::make_unique<ASPOpr>(
plan.plan(op_idx).opr().path(),
schema, plan.plan(op_idx).opr().path(),
plan.plan(op_idx).meta_data(0), vertex, v_alias),
ret_meta);
} else {
Expand Down
8 changes: 3 additions & 5 deletions flex/engines/graph_db/runtime/execute/ops/retrieve/project.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,7 @@ bool is_exchange_index(const common::Expression& expr, int alias, int& tag) {
if (var.has_tag()) {
tag = var.tag().id();
}
// if (tag == alias) {
return true;
//}
}
return false;
}
Expand Down Expand Up @@ -613,7 +611,7 @@ bool is_check_property_in_range(const common::Expression& expr, int& tag,
return false;
}
name = var.property().key().name();
if (name == "id" || name == "label") {
if (name == "label") {
return false;
}
}
Expand Down Expand Up @@ -720,7 +718,7 @@ bool is_check_property_cmp(const common::Expression& expr, int& tag,
return false;
}
name = var.property().key().name();
if (name == "id" || name == "label") {
if (name == "label") {
return false;
}
}
Expand Down Expand Up @@ -797,7 +795,7 @@ bool is_property_extract(const common::Expression& expr, int& tag,
}
if (var.has_property() && var.property().has_key()) {
name = var.property().key().name();
if (name == "id" || name == "label") {
if (name == "label") {
return false;
}
if (var.has_node_type()) {
Expand Down
13 changes: 7 additions & 6 deletions flex/engines/graph_db/runtime/execute/ops/retrieve/select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ class SelectIdNeOpr : public IReadOperator {
gs::runtime::Context&& ctx, gs::runtime::OprTimer& timer) override {
auto tag = expr_.operators(0).var().tag().id();
auto col = ctx.get(tag);
const auto& name = expr_.operators(0).var().property().key().name();
if ((!col->is_optional()) &&
col->column_type() == ContextColumnType::kVertex) {
auto vertex_col = std::dynamic_pointer_cast<IVertexColumn>(col);
auto labels = vertex_col->get_labels_set();
if (labels.size() == 1) {
if (labels.size() == 1 &&
name == graph.schema().get_vertex_primary_key_name(*labels.begin())) {
auto label = *labels.begin();
int64_t oid = std::stoll(params.at(expr_.operators(2).param().name()));
vid_t vid;
Expand Down Expand Up @@ -88,11 +90,11 @@ class SelectIdNeOpr : public IReadOperator {
common::Expression expr_;
};

class SelectOprBeta : public IReadOperator {
class SelectOpr : public IReadOperator {
public:
SelectOprBeta(const common::Expression& expr) : expr_(expr) {}
SelectOpr(const common::Expression& expr) : expr_(expr) {}

std::string get_operator_name() const override { return "SelectOprBeta"; }
std::string get_operator_name() const override { return "SelectOpr"; }

bl::result<gs::runtime::Context> Eval(
const gs::runtime::GraphReadInterface& graph,
Expand Down Expand Up @@ -128,8 +130,7 @@ bl::result<ReadOpBuildResultT> SelectOprBuilder::Build(
}
}
}
return std::make_pair(std::make_unique<SelectOprBeta>(opr.predicate()),
ctx_meta);
return std::make_pair(std::make_unique<SelectOpr>(opr.predicate()), ctx_meta);
}

} // namespace ops
Expand Down
4 changes: 2 additions & 2 deletions flex/engines/graph_db/runtime/execute/ops/retrieve/vertex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ bl::result<ReadOpBuildResultT> VertexOprBuilder::Build(
{
label_t exact_pk_label;
std::string exact_pk;
if (is_pk_exact_check(vertex.params().predicate(), exact_pk_label,
exact_pk)) {
if (is_pk_exact_check(schema, vertex.params().predicate(),
exact_pk_label, exact_pk)) {
return std::make_pair(
std::make_unique<GetVFromVerticesWithPKExactOpr>(
plan.plan(op_idx).opr().vertex(), p, exact_pk_label,
Expand Down
9 changes: 6 additions & 3 deletions flex/engines/graph_db/runtime/execute/ops/update/load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ parse_edge_mapping(
auto src_mapping = edge_mapping.source_vertex_mappings(0);
auto dst_mapping = edge_mapping.destination_vertex_mappings(0);

CHECK(src_mapping.property().key().name() == "id");
CHECK(dst_mapping.property().key().name() == "id");
CHECK(src_mapping.property().key().name() ==
schema.get_vertex_primary_key_name(src_label_id));
CHECK(dst_mapping.property().key().name() ==
schema.get_vertex_primary_key_name(dst_label_id));

auto src_pk_type = get_vertex_pk_type(schema, src_label_id);
auto dst_pk_type = get_vertex_pk_type(schema, dst_label_id);
Expand Down Expand Up @@ -185,13 +187,14 @@ parse_vertex_mapping(
const auto& props = vertex_mapping.column_mappings();
size_t prop_size = vertex_mapping.column_mappings_size();
std::vector<int> properties(vertex_prop_types.size());
const auto& pk_name = schema.get_vertex_primary_key_name(vertex_label_id);
CHECK(prop_size == vertex_prop_types.size() + 1)
<< "Only support one primary key";
int id_col = -1;
for (size_t j = 0; j < prop_size; ++j) {
const auto& prop = props[j];
const auto& prop_name = prop.property().key().name();
if (prop_name == "id") {
if (prop_name == pk_name) {
id_col = prop.column().index();
} else {
const auto& prop_idx = prop_map.at(prop_name).second;
Expand Down
9 changes: 5 additions & 4 deletions flex/engines/graph_db/runtime/utils/special_predicates.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ inline bool is_label_within_predicate(const common::Expression& expr,
}

inline bool is_pk_oid_exact_check(
const common::Expression& expr,
const gs::Schema& schema, label_t label, const common::Expression& expr,
std::function<Any(const std::map<std::string, std::string>&)>& value) {
if (expr.operators_size() != 3) {
return false;
Expand All @@ -61,7 +61,7 @@ inline bool is_pk_oid_exact_check(
expr.operators(0).var().property().has_key())) {
auto& key = expr.operators(7).var().property().key();
if (!(key.item_case() == common::NameOrId::ItemCase::kName &&
key.name() == "id")) {
key.name() == schema.get_vertex_primary_key_name(label))) {
return false;
}
return false;
Expand Down Expand Up @@ -104,7 +104,8 @@ inline bool is_pk_oid_exact_check(
return false;
}

inline bool is_pk_exact_check(const common::Expression& expr, label_t& label,
inline bool is_pk_exact_check(const gs::Schema& schema,
const common::Expression& expr, label_t& label,
std::string& pk) {
if (expr.operators_size() != 11) {
return false;
Expand Down Expand Up @@ -150,7 +151,7 @@ inline bool is_pk_exact_check(const common::Expression& expr, label_t& label,
expr.operators(7).var().property().has_key())) {
auto& key = expr.operators(7).var().property().key();
if (!(key.item_case() == common::NameOrId::ItemCase::kName &&
key.name() == "id")) {
key.name() == schema.get_vertex_primary_key_name(label))) {
return false;
}
}
Expand Down
5 changes: 3 additions & 2 deletions flex/engines/graph_db/runtime/utils/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ bool vertex_property_topN(bool asc, size_t limit,
const std::string& prop_name,
std::vector<size_t>& offsets) {
std::vector<PropertyType> prop_types;
for (auto l : col->get_labels_set()) {
const auto& labels = col->get_labels_set();
for (auto l : labels) {
const auto& prop_names = graph.schema().get_vertex_property_names(l);
int prop_names_size = prop_names.size();
for (int prop_id = 0; prop_id < prop_names_size; ++prop_id) {
Expand All @@ -187,7 +188,7 @@ bool vertex_property_topN(bool asc, size_t limit,
}
}
}
if (prop_types.empty()) {
if (prop_types.size() != labels.size()) {
return false;
}
for (size_t k = 1; k < prop_types.size(); ++k) {
Expand Down
Loading

0 comments on commit 383a1e0

Please sign in to comment.