Skip to content

Commit

Permalink
DPL Analysis: Preliminary changes for Table rewrite (#13679)
Browse files Browse the repository at this point in the history
  • Loading branch information
aalkin authored Nov 19, 2024
1 parent 496ce9d commit b0b090f
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 408 deletions.
348 changes: 166 additions & 182 deletions Framework/Core/include/Framework/ASoA.h

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Framework/Core/include/Framework/ASoAHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
return groupedIndices;
}

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
selectedRows = table.getSelectedRows(); // vector<int64_t>
}

Expand All @@ -111,7 +111,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
}
});

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
if (selectedRows[ind] >= selInd + chunkLength) {
selInd += chunkLength;
continue; // Go to the next chunk, no value selected in this chunk
Expand All @@ -120,7 +120,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol

uint64_t ai = 0;
while (ai < chunkLength) {
if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
ai += selectedRows[ind] - selInd;
selInd = selectedRows[ind];
}
Expand All @@ -132,7 +132,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
}
ind++;

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
if (ind >= selectedRows.size()) {
break;
}
Expand All @@ -141,7 +141,7 @@ std::vector<BinningIndex> groupTable(const T& table, const BP<Cs...>& binningPol
}
}

if constexpr (soa::is_soa_filtered_v<T>) {
if constexpr (soa::is_filtered_table<T>) {
if (ind == selectedRows.size()) {
break;
}
Expand Down Expand Up @@ -1348,7 +1348,7 @@ auto combinations(const BP& binningPolicy, int categoryNeighbours, const T1& out
}
}

template <typename... T2s>
template <soa::is_table... T2s>
auto combinations(const o2::framework::expressions::Filter& filter, const T2s&... tables)
{
if constexpr (isSameType<T2s...>()) {
Expand All @@ -1366,7 +1366,7 @@ CombinationsGenerator<P2<T2s...>> combinations(const P2<T2s...>& policy)
return CombinationsGenerator<P2<T2s...>>(policy);
}

template <template <typename...> typename P2, typename... T2s>
template <template <typename...> typename P2, soa::is_table... T2s>
CombinationsGenerator<P2<Filtered<T2s>...>> combinations(P2<T2s...>&&, const o2::framework::expressions::Filter& filter, const T2s&... tables)
{
return CombinationsGenerator<P2<Filtered<T2s>...>>(P2<Filtered<T2s>...>(tables.select(filter)...));
Expand Down
32 changes: 15 additions & 17 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct WritingCursor<soa::Table<ORIGIN, PC...>> {
template <typename T>
static decltype(auto) extract(T const& arg)
{
if constexpr (soa::is_soa_iterator_v<T>) {
if constexpr (requires(T t) { t.globalIndex(); }) {
return arg.globalIndex();
} else {
static_assert(!framework::has_type<T>(framework::pack<PC...>{}), "Argument type mismatch");
Expand All @@ -104,6 +104,7 @@ struct WritingCursor<soa::Table<ORIGIN, PC...>> {

/// Helper to define output for a Table
template <typename T>
requires soa::is_table<T> || soa::is_iterator<T>
struct OutputForTable {
using table_t = T;
using metadata = typename aod::MetadataTrait<table_t>::metadata;
Expand Down Expand Up @@ -243,16 +244,15 @@ namespace
template <typename T, typename Key>
inline std::shared_ptr<arrow::ChunkedArray> getIndexToKey(arrow::Table* table)
{
using IC = framework::pack_element_t<framework::has_type_at_conditional<soa::is_binding_compatible, Key>(typename T::external_index_columns_t{}), typename T::external_index_columns_t>;
return table->column(framework::has_type_at<IC>(typename T::persistent_columns_t{}));
using IC = framework::pack_element_t<framework::has_type_at_conditional_v<soa::is_binding_compatible, Key>(typename T::external_index_columns_t{}), typename T::external_index_columns_t>;
return table->column(framework::has_type_at_v<IC>(typename T::persistent_columns_t{}));
}

template <typename C>
template <soa::is_column C>
struct ColumnTrait {
static_assert(framework::is_base_of_template_v<o2::soa::Column, C>, "Not a column type!");
using column_t = C;

static constexpr auto listSize()
static consteval auto listSize()
{
if constexpr (std::is_same_v<typename C::type, std::vector<int>>) {
return -1;
Expand Down Expand Up @@ -483,14 +483,14 @@ struct Service {
}
};

template <typename T>
auto getTableFromFilter(const T& table, soa::SelectionVector&& selection)
auto getTableFromFilter(soa::is_filtered_table auto const& table, soa::SelectionVector&& selection)
{
if constexpr (soa::is_soa_filtered_v<std::decay_t<T>>) {
return std::make_unique<o2::soa::Filtered<T>>(std::vector{table}, std::forward<soa::SelectionVector>(selection));
} else {
return std::make_unique<o2::soa::Filtered<T>>(std::vector{table.asArrowTable()}, std::forward<soa::SelectionVector>(selection));
}
return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table}, std::forward<soa::SelectionVector>(selection));
}

auto getTableFromFilter(soa::is_not_filtered_table auto const& table, soa::SelectionVector&& selection)
{
return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table.asArrowTable()}, std::forward<soa::SelectionVector>(selection));
}

void initializePartitionCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter);
Expand Down Expand Up @@ -611,20 +611,18 @@ struct Partition {
namespace o2::soa
{
/// On-the-fly adding of expression columns
template <typename T, typename... Cs>
template <soa::is_table T, soa::is_spawnable_column... Cs>
auto Extend(T const& table)
{
static_assert((soa::is_type_spawnable_v<Cs> && ...), "You can only extend a table with expression columns");
using output_t = Join<T, soa::Table<o2::framework::OriginEnc{"JOIN"}, Cs...>>;
return output_t{{o2::framework::spawner<o2::framework::OriginEnc{"JOIN"}>(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension"), table.asArrowTable()}, 0};
}

/// Template function to attach dynamic columns on-the-fly (e.g. inside
/// process() function). Dynamic columns need to be compatible with the table.
template <typename T, typename... Cs>
template <soa::is_table T, soa::is_dynamic_column... Cs>
auto Attach(T const& table)
{
static_assert((framework::is_base_of_template_v<o2::soa::DynamicColumn, Cs> && ...), "You can only attach dynamic columns");
using output_t = Join<T, o2::soa::Table<o2::framework::OriginEnc{"JOIN"}, Cs...>>;
return output_t{{table.asArrowTable()}, table.offset()};
}
Expand Down
49 changes: 6 additions & 43 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,43 +417,6 @@ struct OutputManager<Builds<T>> {
}
};

template <typename T>
class has_instance
{
using one = char;
struct two {
char x[2];
};

template <typename C>
static one test(decltype(&C::instance));
template <typename C>
static two test(...);

public:
enum { value = sizeof(test<T>(nullptr)) == sizeof(char) };
};

template <typename T>
class has_end_of_stream
{
using one = char;
struct two {
char x[2];
};

template <typename C>
static one test(decltype(&C::endOfStream));
template <typename C>
static two test(...);

public:
enum { value = sizeof(test<T>(nullptr)) == sizeof(char) };
};

template <typename T>
inline constexpr bool has_end_of_stream_v = has_end_of_stream<T>::value;

template <typename T>
struct ServiceManager {
template <typename ANY>
Expand All @@ -477,7 +440,7 @@ struct ServiceManager {

template <typename T>
struct ServiceManager<Service<T>> {
static bool add(std::vector<ServiceSpec>& specs, Service<T>& service)
static bool add(std::vector<ServiceSpec>& specs, Service<T>& /*service*/)
{
if constexpr (o2::framework::is_base_of_template_v<LoadableServicePlugin, T>) {
T p = T{};
Expand All @@ -489,7 +452,7 @@ struct ServiceManager<Service<T>> {

static bool prepare(InitContext& context, Service<T>& service)
{
if constexpr (has_instance<T>::value) {
if constexpr (requires { &T::instance; }) {
service.service = &(T::instance()); // Sigh...
return true;
} else {
Expand All @@ -500,11 +463,11 @@ struct ServiceManager<Service<T>> {
}

/// If a service has a method endOfStream, it is called at the end of the stream.
static bool postRun(EndOfStreamContext& context, Service<T>& service)
static bool postRun(EndOfStreamContext& /*context*/, Service<T>& service)
{
// FIXME: for the moment we only need endOfStream to be
// stateless. In the future we might want to pass it EndOfStreamContext
if constexpr (has_end_of_stream_v<T>) {
if constexpr (requires { &T::endOfStream; }) {
service.service->endOfStream();
return true;
}
Expand Down Expand Up @@ -637,7 +600,7 @@ struct SpawnManager {
static bool requestInputs(std::vector<InputSpec>&, T const&) { return false; }
};

template <typename TABLE>
template <soa::is_table TABLE>
struct SpawnManager<Spawns<TABLE>> {
static bool requestInputs(std::vector<InputSpec>& inputs, Spawns<TABLE>& spawns)
{
Expand All @@ -656,7 +619,7 @@ struct IndexManager {
static bool requestInputs(std::vector<InputSpec>&, T const&) { return false; };
};

template <typename IDX>
template <soa::is_index_table IDX>
struct IndexManager<Builds<IDX>> {
static bool requestInputs(std::vector<InputSpec>& inputs, Builds<IDX>& builds)
{
Expand Down
Loading

0 comments on commit b0b090f

Please sign in to comment.