Skip to content

Commit

Permalink
Accumulator concept
Browse files Browse the repository at this point in the history
Implements PartialScoreAccumulator concept (only under C++20), and
introduces a slight refactoring of the accumulator API, including
renaming methods and making some methods private.

These changes are part of the effort to stabilize library API. Concepts
not only help with debugging and compilation time using C+20-compatible
compiler, but also serve as a programmatically verifiable API
documentation. Thus, even if not always used, they are extremely useful.
  • Loading branch information
elshize committed Feb 7, 2023
1 parent 734ed9f commit 16c3a3b
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 30 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ endif()
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

if(NOT CMAKE_CXX_STANDARD EQUAL 17)
add_compile_definitions(PISA_ENABLE_CONCEPTS=1)
endif()

option(PISA_BUILD_TOOLS "Build command line tools." ON)
option(PISA_ENABLE_TESTING "Enable testing of the library." ON)
Expand Down
12 changes: 12 additions & 0 deletions include/pisa/accumulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Score Accumulators

Score accumulators are used to accumulate (and later aggregate) document
scores. These are handy for term-at-a-time (TAAT) query processing. Two
implementations are available: `SimpleAccumulator` and
`LazyAccumulator`. They both satisfy the `PartialScoreAccumulator`
concept (if using in C++20 mode). For the definition, see
`partial_score_accumulator.hpp`.

`SimpleAccumulator` is a simple wrapper over a `std::vector<float>`,
while `LazyAccumulator` implements some optimizations as described in
`lazy_accumulator.hpp`.
27 changes: 19 additions & 8 deletions include/pisa/accumulator/lazy_accumulator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@
#include <cstddef>
#include <vector>

#include "concepts.hpp"
#include "partial_score_accumulator.hpp"
#include "topk_queue.hpp"

namespace pisa {

/**
* Lazy accumulator fully resets the entire array only (1 << counter_bit_size) call to `reset()`.
* For example, if `counter_bit_size = 3`, then all values are set to 0 every 8th reset. To allow
* for that, the array is partitioned into blocks, each of which has a number of accumulators and
* a descriptor that encodes when was the last time the block was in use. If it was used before
* the current query (according to a counter that is reset each cycle), the block is wiped out
* before accumulating another score.
*/
template <int counter_bit_size, typename Descriptor = std::uint64_t>
struct Lazy_Accumulator {
class LazyAccumulator {
using reference = float&;

static_assert(
Expand Down Expand Up @@ -46,11 +56,14 @@ struct Lazy_Accumulator {
}
};

explicit Lazy_Accumulator(std::size_t size)
public:
explicit LazyAccumulator(std::size_t size)
: m_size(size), m_accumulators((size + counters_in_descriptor - 1) / counters_in_descriptor)
{}
{
PISA_ASSERT_CONCEPT(PartialScoreAccumulator<decltype(*this)>);
}

void init()
void reset()
{
if (m_counter == 0) {
auto first = reinterpret_cast<std::byte*>(&m_accumulators.front());
Expand All @@ -60,7 +73,7 @@ struct Lazy_Accumulator {
}
}

void accumulate(std::ptrdiff_t const document, float score)
void accumulate(std::size_t document, float score)
{
auto const block = document / counters_in_descriptor;
auto const pos_in_block = document % counters_in_descriptor;
Expand All @@ -70,7 +83,7 @@ struct Lazy_Accumulator {
m_accumulators[block].accumulators[pos_in_block] += score;
}

void aggregate(topk_queue& topk)
void collect(topk_queue& topk)
{
uint64_t docid = 0U;
for (auto const& block: m_accumulators) {
Expand All @@ -86,8 +99,6 @@ struct Lazy_Accumulator {
}

[[nodiscard]] auto size() const noexcept -> std::size_t { return m_size; }
[[nodiscard]] auto blocks() noexcept -> std::vector<Block>& { return m_accumulators; }
[[nodiscard]] auto counter() const noexcept -> int { return m_counter; }

private:
std::size_t m_size;
Expand Down
60 changes: 60 additions & 0 deletions include/pisa/accumulator/partial_score_accumulator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 PISA developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// clang-format off

#pragma once

#ifdef PISA_ENABLE_CONCEPTS

#include <concepts>
#include <cstdint>
#include <iterator>
#include <vector>

#include "topk_queue.hpp"

namespace pisa {

/**
* Accumulator capable of accumulating partial scores. One document can be accumulated multiple
* times, and the scores will be summed. Typically used for term-at-a-time (TAAT) processing.
*/
template <typename T>
concept PartialScoreAccumulator = requires(T a, std::uint32_t docid, float score)
{
/**
* Resets the accumulator. After a reset, it is ready to be used for the next query.
*/
a.reset();

/**
* Accumulates a partial score for the given document.
*/
a.accumulate(docid, score);
}
&& requires(T const a, float score, pisa::topk_queue& topk)
{
/**
* Pushes results to the top-k priority queue.
*/
a.collect(topk);
{ a.size() } -> std::same_as<std::size_t>;
};

}; // namespace pisa

// clang-format on

#endif
42 changes: 36 additions & 6 deletions include/pisa/accumulator/simple_accumulator.hpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
/* Copyright 2023 PISA developers
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <vector>

#include "concepts.hpp"
#include "partial_score_accumulator.hpp"
#include "topk_queue.hpp"

namespace pisa {

struct Simple_Accumulator: public std::vector<float> {
explicit Simple_Accumulator(std::ptrdiff_t size) : std::vector<float>(size) {}
void init() { std::fill(begin(), end(), 0.0); }
void accumulate(uint32_t doc, float score) { operator[](doc) += score; }
void aggregate(topk_queue& topk)
/**
* Simple accumulator is an array of scores, where element n is the score of the n-th document.
* Each reset sets all values to 0, and accumulating is done by simply adding the given score to
* the score in the accumulator.
*/
class SimpleAccumulator: public std::vector<float> {
public:
explicit SimpleAccumulator(std::size_t size) : std::vector<float>(size)
{
PISA_ASSERT_CONCEPT(PartialScoreAccumulator<decltype(*this)>);
}

void reset() { std::fill(begin(), end(), 0.0); }

void accumulate(std::uint32_t doc, float score) { operator[](doc) += score; }

void collect(topk_queue& topk)
{
uint64_t docid = 0U;
std::uint32_t docid = 0U;
std::for_each(begin(), end(), [&](auto score) {
if (topk.would_enter(score)) {
topk.insert(score, docid);
Expand Down
31 changes: 31 additions & 0 deletions include/pisa/concepts.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* Copyright 2023 PISA developers
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#ifdef PISA_ENABLE_CONCEPTS

#define PISA_REQUIRES(x) \
requires (x)

#define PISA_ASSERT_CONCEPT(x) \
static_assert(x)

#else

#define PISA_REQUIRES(x) /**/

#define PISA_ASSERT_CONCEPT(x) /**/

#endif
5 changes: 4 additions & 1 deletion include/pisa/query/algorithm/range_taat_query.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "accumulator/partial_score_accumulator.hpp"
#include "concepts.hpp"
#include "query/queries.hpp"
#include "topk_queue.hpp"

Expand All @@ -10,13 +12,14 @@ struct range_taat_query {
explicit range_taat_query(topk_queue& topk) : m_topk(topk) {}

template <typename CursorRange, typename Acc>
PISA_REQUIRES(PartialScoreAccumulator<Acc>)
void operator()(CursorRange&& cursors, uint64_t max_docid, size_t range_size, Acc&& accumulator)
{
if (cursors.empty()) {
return;
}

accumulator.init();
accumulator.reset();

for (size_t end = range_size; end + range_size < max_docid; end += range_size) {
process_range(cursors, end, accumulator);
Expand Down
12 changes: 5 additions & 7 deletions include/pisa/query/algorithm/ranked_or_taat_query.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
#pragma once

#include "accumulator/partial_score_accumulator.hpp"
#include "concepts.hpp"
#include "query/queries.hpp"
#include "topk_queue.hpp"
#include "util/intrinsics.hpp"

#include "accumulator/simple_accumulator.hpp"

#include "topk_queue.hpp"

namespace pisa {
Expand All @@ -15,20 +12,21 @@ class ranked_or_taat_query {
explicit ranked_or_taat_query(topk_queue& topk) : m_topk(topk) {}

template <typename CursorRange, typename Acc>
PISA_REQUIRES(PartialScoreAccumulator<Acc>)
void operator()(CursorRange&& cursors, uint64_t max_docid, Acc&& accumulator)
{
if (cursors.empty()) {
return;
}
accumulator.init();
accumulator.reset();

for (auto&& cursor: cursors) {
while (cursor.docid() < max_docid) {
accumulator.accumulate(cursor.docid(), cursor.score());
cursor.next();
}
}
accumulator.aggregate(m_topk);
accumulator.collect(m_topk);
}

std::vector<typename topk_queue::entry_type> const& topk() const { return m_topk.topk(); }
Expand Down
9 changes: 5 additions & 4 deletions test/test_ranked_queries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <functional>

#include "accumulator/lazy_accumulator.hpp"
#include "accumulator/simple_accumulator.hpp"
#include "cursor/block_max_scored_cursor.hpp"
#include "cursor/max_scored_cursor.hpp"
#include "cursor/scored_cursor.hpp"
Expand Down Expand Up @@ -99,14 +100,14 @@ class range_query_128: public range_query<T> {
TEMPLATE_TEST_CASE(
"Ranked query test",
"[query][ranked][integration]",
ranked_or_taat_query_acc<Simple_Accumulator>,
ranked_or_taat_query_acc<Lazy_Accumulator<4>>,
ranked_or_taat_query_acc<SimpleAccumulator>,
ranked_or_taat_query_acc<LazyAccumulator<4>>,
wand_query,
maxscore_query,
block_max_wand_query,
block_max_maxscore_query,
range_query_128<ranked_or_taat_query_acc<Simple_Accumulator>>,
range_query_128<ranked_or_taat_query_acc<Lazy_Accumulator<4>>>,
range_query_128<ranked_or_taat_query_acc<SimpleAccumulator>>,
range_query_128<ranked_or_taat_query_acc<LazyAccumulator<4>>>,
range_query_128<wand_query>,
range_query_128<maxscore_query>,
range_query_128<block_max_wand_query>,
Expand Down
5 changes: 3 additions & 2 deletions tools/evaluate_queries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <tbb/parallel_for.h>

#include "accumulator/lazy_accumulator.hpp"
#include "accumulator/simple_accumulator.hpp"
#include "app.hpp"
#include "cursor/block_max_scored_cursor.hpp"
#include "cursor/max_scored_cursor.hpp"
Expand Down Expand Up @@ -115,7 +116,7 @@ void evaluate_queries(
return topk.topk();
};
} else if (query_type == "ranked_or_taat") {
query_fun = [&, accumulator = Simple_Accumulator(index.num_docs())](Query query) mutable {
query_fun = [&, accumulator = SimpleAccumulator(index.num_docs())](Query query) mutable {
topk_queue topk(k);
ranked_or_taat_query ranked_or_taat_q(topk);
ranked_or_taat_q(
Expand All @@ -124,7 +125,7 @@ void evaluate_queries(
return topk.topk();
};
} else if (query_type == "ranked_or_taat_lazy") {
query_fun = [&, accumulator = Lazy_Accumulator<4>(index.num_docs())](Query query) mutable {
query_fun = [&, accumulator = LazyAccumulator<4>(index.num_docs())](Query query) mutable {
topk_queue topk(k);
ranked_or_taat_query ranked_or_taat_q(topk);
ranked_or_taat_q(
Expand Down
5 changes: 3 additions & 2 deletions tools/queries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <spdlog/spdlog.h>

#include "accumulator/lazy_accumulator.hpp"
#include "accumulator/simple_accumulator.hpp"
#include "app.hpp"
#include "cursor/block_max_scored_cursor.hpp"
#include "cursor/cursor.hpp"
Expand Down Expand Up @@ -257,7 +258,7 @@ void perftest(
return topk.topk().size();
};
} else if (t == "ranked_or_taat" && wand_data_filename) {
Simple_Accumulator accumulator(index.num_docs());
SimpleAccumulator accumulator(index.num_docs());
topk_queue topk(k);
ranked_or_taat_query ranked_or_taat_q(topk);
query_fun = [&, ranked_or_taat_q, accumulator](Query query, Score threshold) mutable {
Expand All @@ -270,7 +271,7 @@ void perftest(
return topk.topk().size();
};
} else if (t == "ranked_or_taat_lazy" && wand_data_filename) {
Lazy_Accumulator<4> accumulator(index.num_docs());
LazyAccumulator<4> accumulator(index.num_docs());
topk_queue topk(k);
ranked_or_taat_query ranked_or_taat_q(topk);
query_fun = [&, ranked_or_taat_q, accumulator](Query query, Score threshold) mutable {
Expand Down

0 comments on commit 16c3a3b

Please sign in to comment.