Skip to content

Commit

Permalink
Add SAMPLING clause, probability-based sample
Browse files Browse the repository at this point in the history
  • Loading branch information
YolandaLyj committed Oct 13, 2022
1 parent 42b0c9e commit 256d9b2
Show file tree
Hide file tree
Showing 29 changed files with 892 additions and 10 deletions.
185 changes: 185 additions & 0 deletions src/common/algorithm/sampler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_ALGORITHM_SAMPLER_H_
#define COMMON_ALGORITHM_SAMPLER_H_

#include <cfloat>
#include <ctime>
#include <random>
#include <type_traits>
#include <utility>
#include <vector>

namespace nebula {
namespace algorithm {

namespace {
template <typename T = float>
T UniformRandom() {
static_assert(std::is_floating_point<T>::value,
"Only support float point type");
#if defined(__clang__)
static std::default_random_engine e(std::time(nullptr));
static std::uniform_real_distribution<T> u(0., 1.);
#elif defined(__GNUC__) || defined(__GNUG__)
static thread_local std::default_random_engine e(std::time(nullptr));
static thread_local std::uniform_real_distribution<T> u(0., 1.);
#endif
return u(e);
}
} // namespace

template <typename T>
void Normalization(std::vector<T>& distribution) {
static_assert(std::is_floating_point<T>::value,
"Only support float point type");
T norm_sum = 0.0f;
for (auto& dist : distribution) {
norm_sum += dist;
}
if (norm_sum <= FLT_EPSILON and !distribution.empty()) {
for (size_t i = 0; i < distribution.size(); ++i) {
distribution[i] = 1.0f / static_cast<T>(distribution.size());
}
return;
}
for (size_t i = 0; i < distribution.size(); ++i) {
distribution[i] /= norm_sum;
}
}

// https://en.wikipedia.org/wiki/Alias_method
template <typename T = float>
class AliasSampler {
public:
static_assert(std::is_floating_point<T>::value,
"Only support float point type");
using AliasType = uint32_t;
bool Init(std::vector<T>& distribution);
inline bool Init(const std::vector<T>& distribution);
AliasType Sample() const;
inline size_t Size() const;

private:
std::vector<T> prob_;
std::vector<AliasType> alias_;
};

template <typename T>
bool AliasSampler<T>::Init(std::vector<T>& distribution) {
// normalization sum of distribution to 1
Normalization(distribution);

prob_.resize(distribution.size());
alias_.resize(distribution.size());
std::vector<AliasType> smaller, larger;
smaller.reserve(distribution.size());
larger.reserve(distribution.size());

for (size_t i = 0; i < distribution.size(); ++i) {
prob_[i] = distribution[i] * distribution.size();
if (prob_[i] < 1.0) {
smaller.push_back(i);
} else {
larger.push_back(i);
}
}
// Construct the probability and alias tables
AliasType small, large;
while (!smaller.empty() && !larger.empty()) {
small = smaller.back();
smaller.pop_back();
large = larger.back();
larger.pop_back();
alias_[small] = large;
prob_[large] = prob_[large] + prob_[small] - 1.0;
if (prob_[large] < 1.0) {
smaller.push_back(large);
} else {
larger.push_back(large);
}
}
while (!smaller.empty()) {
small = smaller.back();
smaller.pop_back();
prob_[small] = 1.0;
}
while (!larger.empty()) {
large = larger.back();
larger.pop_back();
prob_[large] = 1.0;
}
return true;
}

template <typename T>
bool AliasSampler<T>::Init(const std::vector<T>& distribution) {
std::vector<T> dist = distribution;
return Init(dist);
}

template <typename T>
typename AliasSampler<T>::AliasType AliasSampler<T>::Sample() const {
AliasType roll = floor(prob_.size() * UniformRandom());
bool coin = UniformRandom() < prob_[roll];
return coin ? roll : alias_[roll];
}

template <typename T>
size_t AliasSampler<T>::Size() const {
return prob_.size();
}

/**
* binary sample in accumulation weights
*/
template <typename T = float>
size_t BinarySampleAcc(const std::vector<T>& accumulate_weights) {
if (accumulate_weights.empty()) {
return 0;
}
T rnd = UniformRandom() * accumulate_weights.back();
size_t low = 0, high = accumulate_weights.size() - 1, mid = 0;
while (low <= high) {
mid = ((high - low) >> 1) + low;
if (rnd < accumulate_weights[mid]) {
if (mid == 0) {
return mid;
}
high = mid - 1;
if (high >= 0 && rnd >= accumulate_weights[high]) {
// rnd in [mid-1, mid)
return mid;
}
} else {
low = mid + 1;
if (low < accumulate_weights.size() && rnd < accumulate_weights[low]) {
// rnd in [mid, mid+1)
return low;
}
}
}
return mid;
}

/**
* binary sample in weights
*/
template <typename T = float>
size_t BinarySample(const std::vector<T>& weights) {
std::vector<T> accumulate_weights(weights.size(), 0.0f);
T cur_weight = 0.0f;
for (size_t i = 0; i < weights.size(); ++i) {
cur_weight += weights[i];
accumulate_weights[i] = cur_weight;
}
Normalization(accumulate_weights);
return BinarySampleAcc(accumulate_weights);
}

} // namespace algorithm
} // namespace nebula
#endif
11 changes: 10 additions & 1 deletion src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "common/expression/PathBuildExpression.h"
#include "graph/context/ast/AstContext.h"
#include "parser/MatchSentence.h"

#include "graph/planner/plan/Query.h"
namespace nebula {
namespace graph {
enum class CypherClauseKind : uint8_t {
Expand All @@ -22,6 +22,7 @@ enum class CypherClauseKind : uint8_t {
kWhere,
kReturn,
kOrderBy,
kSampling,
kPagination,
kYield,
kShortestPath,
Expand Down Expand Up @@ -113,6 +114,12 @@ struct OrderByClauseContext final : CypherClauseContextBase {
std::vector<std::pair<size_t, OrderFactor::OrderType>> indexedOrderFactors;
};

struct SamplingClauseContext final : CypherClauseContextBase {
SamplingClauseContext() : CypherClauseContextBase(CypherClauseKind::kSampling) {}

std::vector<SamplingParams> indexedSamplingFactors;
};

struct PaginationContext final : CypherClauseContextBase {
PaginationContext() : CypherClauseContextBase(CypherClauseKind::kPagination) {}

Expand Down Expand Up @@ -148,6 +155,7 @@ struct YieldClauseContext final : CypherClauseContextBase {
struct ReturnClauseContext final : CypherClauseContextBase {
ReturnClauseContext() : CypherClauseContextBase(CypherClauseKind::kReturn) {}

std::unique_ptr<SamplingClauseContext> sampling;
std::unique_ptr<OrderByClauseContext> order;
std::unique_ptr<PaginationContext> pagination;
std::unique_ptr<YieldClauseContext> yield;
Expand All @@ -156,6 +164,7 @@ struct ReturnClauseContext final : CypherClauseContextBase {
struct WithClauseContext final : CypherClauseContextBase {
WithClauseContext() : CypherClauseContextBase(CypherClauseKind::kWith) {}

std::unique_ptr<SamplingClauseContext> sampling;
std::unique_ptr<OrderByClauseContext> order;
std::unique_ptr<PaginationContext> pagination;
std::unique_ptr<WhereClauseContext> where;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ nebula_add_library(
query/UnwindExecutor.cpp
query/SortExecutor.cpp
query/TopNExecutor.cpp
query/SamplingExecutor.cpp
query/IndexScanExecutor.cpp
query/SetExecutor.cpp
query/UnionExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
#include "graph/executor/query/ScanVerticesExecutor.h"
#include "graph/executor/query/SortExecutor.h"
#include "graph/executor/query/TopNExecutor.h"
#include "graph/executor/query/SamplingExecutor.h"
#include "graph/executor/query/TraverseExecutor.h"
#include "graph/executor/query/UnionAllVersionVarExecutor.h"
#include "graph/executor/query/UnionExecutor.h"
Expand Down Expand Up @@ -177,6 +178,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kTopN: {
return pool->makeAndAdd<TopNExecutor>(node, qctx);
}
case PlanNode::Kind::kSampling: {
return pool->makeAndAdd<SamplingExecutor>(node, qctx);
}
case PlanNode::Kind::kFilter: {
return pool->makeAndAdd<FilterExecutor>(node, qctx);
}
Expand Down
131 changes: 131 additions & 0 deletions src/graph/executor/query/SamplingExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/query/SamplingExecutor.h"

#include "common/algorithm/Sampler.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
namespace graph {

using WeightType = float;

folly::Future<Status> SamplingExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto *sampling = asNode<Sampling>(node());
Result result = ectx_->getResult(sampling->inputVar());
auto *iter = result.iterRef();
if (UNLIKELY(iter == nullptr)) {
return Status::Error(
"Internal error: nullptr iterator in sampling executor");
}
if (UNLIKELY(!result.iter()->isSequentialIter())) {
std::stringstream ss;
ss << "Internal error: Sampling executor does not supported "
<< iter->kind();
return Status::Error(ss.str());
}
auto &factors = sampling->factors();
auto size = iter->size();
if (size <= 0) {
iter->clear();
return finish(ResultBuilder()
.value(result.valuePtr())
.iter(std::move(result).iter())
.build());
}
auto colNames = result.value().getDataSet().colNames;
DataSet dataset(std::move(colNames));
for (auto factor : factors) {
if (factor.count <= 0) {
iter->clear();
return finish(ResultBuilder()
.value(result.valuePtr())
.iter(std::move(result).iter())
.build());
}
if (factor.samplingType == SamplingFactor::SamplingType::BINARY) {
executeBinarySample<SequentialIter>(iter, factor.colIdx, factor.count,
dataset);
} else {
executeAliasSample<SequentialIter>(iter, factor.colIdx, factor.count,
dataset);
}
}
return finish(ResultBuilder()
.value(Value(std::move(dataset)))
.iter(Iterator::Kind::kSequential)
.build());
}

template <typename U>
void SamplingExecutor::executeBinarySample(Iterator *iter, size_t index,
size_t count, DataSet &list) {
auto uIter = static_cast<U *>(iter);
std::vector<WeightType> accumulate_weights;
auto it = uIter->begin();
WeightType v;
while (it != uIter->end()) {
v = 1.0;
if ((*it)[index].type() == Value::Type::NULLVALUE) {
LOG(WARNING) << "Sampling type is nullvalue";
} else if ((*it)[index].type() == Value::Type::FLOAT) {
v = (float)((*it)[index].getFloat());
} else if ((*it)[index].type() == Value::Type::INT) {
v = (float)((*it)[index].getInt());
} else {
LOG(WARNING) << "Sampling type is wrong, must be int or float.";
}
if (!accumulate_weights.empty()) {
v += accumulate_weights.back();
}
accumulate_weights.emplace_back(std::move(v));
++it;
}
nebula::algorithm::Normalization<WeightType>(accumulate_weights);
auto beg = uIter->begin();
for (size_t i = 0; i < count; ++i) {
auto idx =
nebula::algorithm::BinarySampleAcc<WeightType>(accumulate_weights);
list.emplace_back(*(beg + idx));
}
uIter->clear();
}

template <typename U>
void SamplingExecutor::executeAliasSample(Iterator *iter, size_t index,
size_t count, DataSet &list) {
auto uIter = static_cast<U *>(iter);
std::vector<WeightType> weights;
auto it = uIter->begin();
WeightType v;
while (it != uIter->end()) {
v = 1.0;
if ((*it)[index].type() == Value::Type::NULLVALUE) {
LOG(WARNING) << "Sampling type is nullvalue";

} else if ((*it)[index].type() == Value::Type::FLOAT) {
v = (float)((*it)[index].getFloat());
} else if ((*it)[index].type() == Value::Type::INT) {
v = (float)((*it)[index].getInt());
} else {
LOG(WARNING) << "Sampling type is wrong, must be int or float.";
}
LOG(ERROR) << "lyj debug v:" << v;
weights.emplace_back(std::move(v));
++it;
}
nebula::algorithm::AliasSampler<WeightType> sampler_;
sampler_.Init(weights);
auto beg = uIter->begin();
for (size_t i = 0; i < count; ++i) {
auto idx = sampler_.Sample();
list.emplace_back(*(beg + idx));
}
uIter->clear();
}

} // namespace graph
} // namespace nebula
Loading

0 comments on commit 256d9b2

Please sign in to comment.