Skip to content

Commit

Permalink
Code modernization and refactoring in Pipeline, OpSpec and InputOpera…
Browse files Browse the repository at this point in the history
…tor (#5826)

* Use string_view more extensively
* Replace more usages of device string with StorageDevice
* Rename InputOperatorNoCopyMode to InputOperatorCopyMode
* Replace runtime_error with invalid_key when looking up external inputs.

---------

Signed-off-by: Michal Zientkiewicz <[email protected]>
  • Loading branch information
mzient authored Feb 24, 2025
1 parent 2febb1f commit c06fdc2
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 82 deletions.
10 changes: 5 additions & 5 deletions dali/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,17 @@ int PopCurrBatchSize(batch_size_map_t *batch_size_map, int max_batch_size,
*
* @param flags Flags typically specified in daliSetExternalInput* functions.
*/
dali::InputOperatorNoCopyMode GetExternalSourceCopyMode(unsigned int flags) {
dali::InputOperatorNoCopyMode no_copy_mode = dali::InputOperatorNoCopyMode::DEFAULT;
dali::InputOperatorCopyMode GetExternalSourceCopyMode(unsigned int flags) {
dali::InputOperatorCopyMode copy_mode = dali::InputOperatorCopyMode::DEFAULT;
DALI_ENFORCE(!((flags & DALI_ext_force_copy) && (flags & DALI_ext_force_no_copy)),
"External Source cannot be forced to use DALI_ext_force_copy and "
"DALI_ext_force_no_copy at the same time.");
if (flags & DALI_ext_force_copy) {
no_copy_mode = dali::InputOperatorNoCopyMode::FORCE_COPY;
copy_mode = dali::InputOperatorCopyMode::FORCE_COPY;
} else if (flags & DALI_ext_force_no_copy) {
no_copy_mode = dali::InputOperatorNoCopyMode::FORCE_NO_COPY;
copy_mode = dali::InputOperatorCopyMode::FORCE_NO_COPY;
}
return no_copy_mode;
return copy_mode;
}

template <typename Backend>
Expand Down
22 changes: 11 additions & 11 deletions dali/pipeline/operator/builtin/input_operator.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,14 +76,14 @@ struct InputQueueItem {


/**
* @brief Option used to override the InputOperator ``no_copy`` parameter
* @brief Option used to override the InputOperator's copy mode, defined by ``no_copy`` parameter.
*
* It allows to:
* * DEFAULT - leave the default (the ``no_copy`` parameter is used),
* * FORCE_COPY - always make a copy,
* * FORCE_NO_COPY - always share the data without copy.
*/
enum class InputOperatorNoCopyMode {
enum class InputOperatorCopyMode {
DEFAULT,
FORCE_COPY,
FORCE_NO_COPY
Expand All @@ -110,7 +110,7 @@ struct InputOperatorSettingMode {
* @brief Select whether to use the parameter defined in the External Source or
* override the mode of operation forcing the copy or no-copy
*/
InputOperatorNoCopyMode no_copy_mode = InputOperatorNoCopyMode::DEFAULT;
InputOperatorCopyMode copy_mode = InputOperatorCopyMode::DEFAULT;
};


Expand Down Expand Up @@ -214,11 +214,11 @@ class InputOperator : public Operator<Backend>, virtual public BatchSizeProvider
*/
virtual DALIDataType in_dtype() const = 0;

bool WouldCopy(InputOperatorNoCopyMode no_copy) const {
switch (no_copy) {
case InputOperatorNoCopyMode::FORCE_COPY:
bool WouldCopy(InputOperatorCopyMode mode) const {
switch (mode) {
case InputOperatorCopyMode::FORCE_COPY:
return true;
case InputOperatorNoCopyMode::FORCE_NO_COPY:
case InputOperatorCopyMode::FORCE_NO_COPY:
return false;
default:
return !no_copy_;
Expand Down Expand Up @@ -509,11 +509,11 @@ class InputOperator : public Operator<Backend>, virtual public BatchSizeProvider
// pass anything as it is ignored.

bool actual_no_copy = no_copy_;
switch (ext_src_setting_mode.no_copy_mode) {
case InputOperatorNoCopyMode::FORCE_COPY:
switch (ext_src_setting_mode.copy_mode) {
case InputOperatorCopyMode::FORCE_COPY:
actual_no_copy = false;
break;
case InputOperatorNoCopyMode::FORCE_NO_COPY:
case InputOperatorCopyMode::FORCE_NO_COPY:
actual_no_copy = true;
break;
default:
Expand Down
6 changes: 3 additions & 3 deletions dali/pipeline/operator/op_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ OpSpec &OpSpec::AddArgumentInput(std::string arg_name, std::string inp_name) {
return *this;
}

OpSpec &OpSpec::SetInitializedArg(const std::string &arg_name, std::shared_ptr<Argument> arg) {
OpSpec &OpSpec::SetInitializedArg(std::string_view arg_name, std::shared_ptr<Argument> arg) {
if (schema_ && schema_->IsDeprecatedArg(arg_name)) {
const auto &deprecation_meta = schema_->DeprecatedArgInfo(arg_name);
// Argument was removed, and we can discard it
Expand All @@ -137,7 +137,7 @@ OpSpec &OpSpec::SetInitializedArg(const std::string &arg_name, std::shared_ptr<A
if (arg->has_name()) {
arg->set_name(new_arg_name);
}
auto [it, inserted] = argument_idxs_.insert({new_arg_name, arguments_.size()});
auto [it, inserted] = argument_idxs_.emplace(new_arg_name, arguments_.size());
if (inserted)
arguments_.push_back(std::move(arg));
else
Expand All @@ -146,7 +146,7 @@ OpSpec &OpSpec::SetInitializedArg(const std::string &arg_name, std::shared_ptr<A
}
}
EnforceNoAliasWithDeprecated(arg_name);
auto [it, inserted] = argument_idxs_.insert({arg_name, arguments_.size()});
auto [it, inserted] = argument_idxs_.emplace(arg_name, arguments_.size());
if (inserted)
arguments_.push_back(std::move(arg));
else
Expand Down
34 changes: 21 additions & 13 deletions dali/pipeline/operator/op_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class DLL_PUBLIC OpSpec {

/** Add an argument with the given name and value. */
template <typename T>
OpSpec &AddArg(const std::string &name, const T &val) {
OpSpec &AddArg(std::string_view name, const T &val) {
EnforceNoAliasWithDeprecated(name);
DALI_ENFORCE(argument_idxs_.find(name) == argument_idxs_.end(),
make_string("AddArg failed. Argument with name \"", name, "\" already exists. "));
Expand All @@ -117,7 +117,7 @@ class DLL_PUBLIC OpSpec {

/** Add an argument with the given name and value if it doesn't exist already. */
template <typename T>
OpSpec &AddArgIfNotExisting(const std::string &name, const T &val) {
OpSpec &AddArgIfNotExisting(std::string_view name, const T &val) {
if (argument_idxs_.find(name) != argument_idxs_.end()) {
return *this;
}
Expand All @@ -126,22 +126,25 @@ class DLL_PUBLIC OpSpec {

/** Sets or adds an argument with the given name and value. */
template <typename T>
OpSpec &SetArg(const std::string &name, const T &val) {
OpSpec &SetArg(std::string_view name, const T &val) {
using S = argument_storage_t<T>;
return SetInitializedArg(name, Argument::Store<S>(name, static_cast<S>(val)));
return SetInitializedArg(name, Argument::Store<S>(std::string(name), static_cast<S>(val)));
}

/** Sets or adds an argument with the given name and value. */
template <typename T>
OpSpec &SetArg(const std::string &name, const std::vector<T> &val) {
OpSpec &SetArg(std::string_view name, const std::vector<T> &val) {
using S = argument_storage_t<T>;
using V = std::vector<S>;
return SetInitializedArg(name, Argument::Store<V>(name, detail::convert_vector<S>(val)));
return SetInitializedArg(
name,
Argument::Store<V>(std::string(name),
detail::convert_vector<S>(val)));
}


/** Add an instantiated argument with given name */
OpSpec &AddInitializedArg(const std::string &name, std::shared_ptr<Argument> arg) {
OpSpec &AddInitializedArg(std::string_view name, std::shared_ptr<Argument> arg) {
EnforceNoAliasWithDeprecated(name);
DALI_ENFORCE(argument_idxs_.find(name) == argument_idxs_.end(),
make_string("AddArg failed. Argument with name \"", name, "\" already exists. "));
Expand All @@ -153,20 +156,25 @@ class DLL_PUBLIC OpSpec {
*
* @remarks Deprecated arguments are renamed (or dropped, if no longer used).
*/
OpSpec &SetInitializedArg(const std::string &arg_name, std::shared_ptr<Argument> arg);
OpSpec &SetInitializedArg(std::string_view arg_name, std::shared_ptr<Argument> arg);

/** Check if the `arg_name` was already set through a deprecated argument */
void EnforceNoAliasWithDeprecated(std::string_view arg_name);

// Forward to string implementation
template <unsigned N>
OpSpec &SetArg(const std::string &name, const char (&c_str)[N]) {
return this->SetArg<std::string>(name, c_str);
template <size_t N>
OpSpec &SetArg(std::string_view name, const char (&c_str)[N]) {
return this->SetArg(name, std::string(c_str));
}

// Forward to string implementation
OpSpec &SetArg(const std::string &name, const char *c_str) {
return this->SetArg<std::string>(name, c_str);
OpSpec &SetArg(std::string_view name, const char *c_str) {
return this->SetArg(name, std::string(c_str));
}

// Forward to string implementation
OpSpec &SetArg(std::string_view name, const std::string_view &str) {
return this->SetArg(name, std::string(str));
}

/**
Expand Down
67 changes: 40 additions & 27 deletions dali/pipeline/pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ static bool has_prefix(const std::string &operator_name, const std::string& pref
prefix.begin());
}

int Pipeline::AddOperator(const OpSpec &spec, const std::string& inst_name) {
int Pipeline::AddOperator(const OpSpec &spec, std::string_view inst_name) {
return AddOperator(spec, inst_name, GetNextLogicalId());
}

Expand All @@ -256,7 +256,7 @@ int Pipeline::AddOperator(const OpSpec &spec) {
return AddOperator(spec, GetNextLogicalId());
}

int Pipeline::AddOperator(const OpSpec &const_spec, const std::string& inst_name, int logical_id) {
int Pipeline::AddOperator(const OpSpec &const_spec, std::string_view inst_name, int logical_id) {
DALI_ENFORCE(!built_,
"Alterations to the pipeline after \"Build()\" has been called are not allowed");

Expand All @@ -278,14 +278,15 @@ int Pipeline::AddOperator(const OpSpec &const_spec, const std::string& inst_name
return result;
}

int Pipeline::AddOperatorImpl(const OpSpec &const_spec, const std::string &inst_name,
int Pipeline::AddOperatorImpl(const OpSpec &const_spec,
std::string_view inst_name,
int logical_id) {
assert(!built_ && "Already checked by AddOperator()");

DALI_ENFORCE(0 <= logical_id,
"Logical id of the node must be positive, got " + std::to_string(logical_id) + ".");

DALI_ENFORCE(instance_names_.insert(inst_name).second,
DALI_ENFORCE(instance_names_.insert(std::string(inst_name)).second,
make_string("Duplicate operator instance name: \"", inst_name, "\"."));

if (logical_id > next_logical_id_) {
Expand All @@ -304,7 +305,7 @@ int Pipeline::AddOperatorImpl(const OpSpec &const_spec, const std::string &inst_
spec.SetArg("preserve_name", true); // ExternalSource must not be collapsed in CSE

// Take a copy of the passed OpSpec for serialization purposes before any modification
this->op_specs_for_serialization_.push_back({inst_name, spec, logical_id});
this->op_specs_for_serialization_.push_back({std::string(inst_name), spec, logical_id});

DALI_ENFORCE(device != "gpu" || device_id_ != CPU_ONLY_DEVICE_ID,
"Cannot add a GPU operator. Pipeline 'device_id' should not be equal to "
Expand Down Expand Up @@ -403,7 +404,7 @@ bool Pipeline::IsLogicalIdUsed(int logical_id) const {
return logical_ids_.find(logical_id) != logical_ids_.end();
}

void Pipeline::AddToOpSpecs(const std::string &inst_name, const OpSpec &spec, int logical_id) {
void Pipeline::AddToOpSpecs(std::string_view inst_name, const OpSpec &spec, int logical_id) {
auto& logical_group = logical_ids_[logical_id];
if (logical_group.size() > 0) {
const auto &group_name = op_specs_[logical_group.front()].spec.SchemaName();
Expand All @@ -414,7 +415,7 @@ void Pipeline::AddToOpSpecs(const std::string &inst_name, const OpSpec &spec, in
" which is already assigned to " + group_name + ".");
const OpSchema &schema = SchemaRegistry::GetSchema(spec.SchemaName());
}
op_specs_.push_back({inst_name, spec, logical_id});
op_specs_.push_back({std::string(inst_name), spec, logical_id});
logical_ids_[logical_id].push_back(op_specs_.size() - 1);
}

Expand Down Expand Up @@ -495,7 +496,12 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {

if (!it->second.has_contiguous_cpu) {
// Add a make contiguous op to produce this output - we need pipeline outputs to be dense.
auto output_name = AddMakeContiguousNode(it->second, name, "cpu", "cpu", "cpu");
auto output_name = AddMakeContiguousNode(
it->second,
name,
StorageDevice::CPU,
"cpu",
StorageDevice::CPU);
outputs.push_back(output_name);
} else {
outputs.push_back(it->first + "_cpu");
Expand All @@ -514,7 +520,12 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {
ToGPU(it);

if (!it->second.has_contiguous_gpu) {
auto output_name = AddMakeContiguousNode(it->second, name, "gpu", "gpu", "gpu");
auto output_name = AddMakeContiguousNode(
it->second,
name,
StorageDevice::GPU,
"gpu",
StorageDevice::GPU);
outputs.push_back(output_name);
} else {
outputs.push_back(it->first + "_gpu");
Expand Down Expand Up @@ -934,7 +945,7 @@ int Pipeline::num_outputs() const {
return output_descs_.size();
}

std::vector<PipelineOutputDesc> Pipeline::output_descs() const {
const std::vector<PipelineOutputDesc> &Pipeline::output_descs() const & {
return output_descs_;
}

Expand Down Expand Up @@ -987,8 +998,8 @@ void Pipeline::Shutdown() {
}

std::tuple<OpSpec, std::string, std::string> Pipeline::PrepareMakeContiguousNode(
EdgeMeta &meta, const std::string &input_name, const std::string &input_dev,
const std::string &device, const std::string &output_dev) {
EdgeMeta &meta, std::string_view input_name, StorageDevice input_dev,
std::string_view device, StorageDevice output_dev) {
// Prefix for the output name to be generated, so it is distinct after being made contiguous.
const char *cpu_to_cpu_out = "contiguous_cpu_to_cpu_";
const char *gpu_to_gpu_out = "contiguous_gpu_to_gpu_";
Expand All @@ -1002,48 +1013,50 @@ std::tuple<OpSpec, std::string, std::string> Pipeline::PrepareMakeContiguousNode
const char *output_prefix = nullptr;
const char *op_name_prefix = nullptr;

if (input_dev == "cpu" && output_dev == "cpu") {
if (input_dev == StorageDevice::CPU && output_dev == StorageDevice::CPU) {
output_prefix = cpu_to_cpu_out;
op_name_prefix = cpu_to_cpu_name;
} else if (input_dev == "cpu" && output_dev == "gpu") {
} else if (input_dev == StorageDevice::CPU && output_dev == StorageDevice::GPU) {
output_prefix = cpu_to_gpu_out;
op_name_prefix = cpu_to_gpu_name;
} else {
output_prefix = gpu_to_gpu_out;
op_name_prefix = gpu_to_gpu_name;
}

std::string output_name = output_prefix + input_name;
std::string op_name = op_name_prefix + input_name;
std::string output_name = make_string(output_prefix, input_name);
std::string op_name = make_string(op_name_prefix, input_name);

OpSpec spec = OpSpec("MakeContiguous")
.AddArg("device", device)
.AddInput(input_name, ParseStorageDevice(input_dev))
.AddOutput(output_name, ParseStorageDevice(output_dev));
.AddInput(std::string(input_name), input_dev)
.AddOutput(output_name, output_dev);
return {spec, op_name, output_name};
}


std::string Pipeline::AddMakeContiguousNode(EdgeMeta &meta, const std::string &input_name,
const std::string &input_dev, const std::string &device,
const std::string &output_dev) {
std::string Pipeline::AddMakeContiguousNode(EdgeMeta &meta,
std::string_view input_name,
StorageDevice input_dev,
std::string_view device,
StorageDevice output_dev) {
auto [spec, op_name, output_name] =
PrepareMakeContiguousNode(meta, input_name, input_dev, device, output_dev);
std::string output_name_and_device = output_name + "_" + output_dev;
std::string output_name_and_device = make_string(output_name, "_", output_dev);

if ((output_dev == "cpu" && meta.has_make_contiguous_cpu) ||
(output_dev == "gpu" && meta.has_make_contiguous_gpu)) {
if ((output_dev == StorageDevice::CPU && meta.has_make_contiguous_cpu) ||
(output_dev == StorageDevice::GPU && meta.has_make_contiguous_gpu)) {
return output_name_and_device;
}

// Add a make contiguous op to produce this output
auto id = GetNextInternalLogicalId();
AddToOpSpecs(op_name, std::move(spec), id);

if (output_dev == "cpu") {
if (output_dev == StorageDevice::CPU) {
meta.has_make_contiguous_cpu = true;
}
if (output_dev == "gpu") {
if (output_dev == StorageDevice::GPU) {
meta.has_make_contiguous_gpu = true;
}
return output_name_and_device;
Expand Down Expand Up @@ -1098,7 +1111,7 @@ void Pipeline::RepeatLastInputs::Refeed(Pipeline &owner, bool fill_queue) {
node.last_input,
node.data_id,
node.last_input.order(),
InputOperatorSettingMode{false, false, InputOperatorNoCopyMode::FORCE_NO_COPY},
InputOperatorSettingMode{false, false, InputOperatorCopyMode::FORCE_NO_COPY},
true);
}
}
Expand Down
Loading

0 comments on commit c06fdc2

Please sign in to comment.