Skip to content

Latest commit

 

History

History
304 lines (266 loc) · 8.53 KB

0.手写DAG任务调度框架.md

File metadata and controls

304 lines (266 loc) · 8.53 KB

问题:设计一个基于DAG有向无环图的任务调度框架,一个module类,一个executor类,需要处理module的依赖关系,使任务可以正常调度运行。

方法一:利用递归的方式,优先处理依赖module

#include <iostream>
#include <vector>
#include <unordered_map>
#include <string>

using namespace std;

// 定义module基类
class module {
public:
    module(string name, vector<string> deps): name_(name), deps_(deps) {}
    virtual void execute() = 0; // 定义纯虚函数execute
    const string& name() const { return name_; }
    const vector<string>& deps() const { return deps_; }
private:
    string name_;
    vector<string> deps_;
};

// 定义moduleA类,继承自module
class moduleA : public module {
public:
    moduleA(string name, vector<string> deps): module(name, deps) {}
    void execute() override {
        // 这里是moduleA的执行操作
        cout << "Executing moduleA " << name() << endl;
    }
};

// 定义moduleB类,继承自module
class moduleB : public module {
public:
    moduleB(string name, vector<string> deps): module(name, deps) {}
    void execute() override {
        // 这里是moduleB的执行操作
        cout << "Executing moduleB " << name() << endl;
    }
};

// 定义executor类,用于处理调度逻辑
class executor {
public:
    void add_module(module* mod) {
        modules_[mod->name()] = mod;
    }
    void execute_all() {
        // 这里实现DAG的调度逻辑
        unordered_map<string, bool> visited;
        for (auto& mod_pair : modules_) {
            if (!visited[mod_pair.first]) {
                execute(mod_pair.second, visited);
            }
        }
    }
private:
    void execute(module* mod, unordered_map<string, bool>& visited) {
        visited[mod->name()] = true;
        for (auto& dep : mod->deps()) {
            if (!visited[dep]) {
                execute(modules_[dep], visited);
            }
        }
        mod->execute();
    }
    unordered_map<string, module*> modules_;
};

int main() {
    // 创建moduleA和moduleB对象,并指定它们的依赖关系
    moduleA mA1("A1", {});
    moduleA mA2("A2", {"A1"});
    moduleB mB1("B1", {"A1", "A2"});
    moduleA mA3("A3", {"B1"});

    // 创建executor对象,并添加moduleA和moduleB对象
    executor e;
    e.add_module(&mA1);
    e.add_module(&mA2);
    e.add_module(&mB1);
    e.add_module(&mA3);

    // 执行所有modules的操作
    e.execute_all();

    return 0;
}

方法二:利用拓扑排序实现DAG的调度逻辑

#include <iostream>
#include <vector>
#include <unordered_map>
#include <string>
#include <queue>

using namespace std;

// 定义module基类
class module {
public:
    module(string name, vector<string> deps): name_(name), deps_(deps) {}
    virtual void execute() = 0; // 定义纯虚函数execute
    const string& name() const { return name_; }
    const vector<string>& deps() const { return deps_; }
private:
    string name_;
    vector<string> deps_;
};

// 定义moduleA类,继承自module
class moduleA : public module {
public:
    moduleA(string name, vector<string> deps): module(name, deps) {}
    void execute() override {
        // 这里是moduleA的执行操作
        cout << "Executing moduleA " << name() << endl;
    }
};

// 定义moduleB类,继承自module
class moduleB : public module {
public:
    moduleB(string name, vector<string> deps): module(name, deps) {}
    void execute() override {
        // 这里是moduleB的执行操作
        cout << "Executing moduleB " << name() << endl;
    }
};

// 定义executor类,用于处理调度逻辑
class executor {
public:
    void add_module(module* mod) {
        modules_[mod->name()] = mod;
    }
    void execute_all() {
        // 使用拓扑排序实现DAG的调度逻辑
        unordered_map<string, int> in_degree;
        unordered_map<string, vector<module*>> dep_modules;

        // 初始化入度和依赖模块
        for (auto& mod_pair : modules_) {
            const string& mod_name = mod_pair.first;
            const vector<string>& mod_deps = mod_pair.second->deps();
            in_degree[mod_name] = mod_deps.size();
            for (const string& dep_name : mod_deps) {
                dep_modules[dep_name].push_back(mod_pair.second);
            }
        }

        // 将入度为0的模块加入队列
        queue<module*> que;
        for (auto& mod_pair : modules_) {
            if (in_degree[mod_pair.first] == 0) {
                que.push(mod_pair.second);
            }
        }

        // 按照队列中的顺序执行模块
        while (!que.empty()) {
            module* mod = que.front();
            que.pop();
            mod->execute();
            for (module* dep_mod : dep_modules[mod->name()]) {
                if (--in_degree[dep_mod->name()] == 0) {
                    que.push(dep_mod);
                }
            }
        }
    }
private:
    unordered_map<string, module*> modules_;
};

int main() {
    // 创建moduleA和moduleB对象,并指定它们的依赖关系
    moduleA mA1("A1", {});
    moduleA mA2("A2", {"A1"});
    moduleB mB1("B1", {"A1", "A2"});
    moduleA mA3("A3", {"B1"});

    // 创建executor对象,并添加moduleA和moduleB对象
    executor e;
    e.add_module(&mA1);
    e.add_module(&mA2);
    e.add_module(&mB1);
    e.add_module(&mA3);

    // 执行所有modules的操作
    e.execute_all();

    return 0;
}

方法三:利用C++20协程的方式解决(暂无法跑通,解决中)

#include <iostream>
#include <vector>
#include <unordered_map>
#include <string>
#include <coroutine>

using namespace std;

// 定义module基类
class module {
public:
    module(string name, vector<string> deps): name_(name), deps_(deps) {}
    virtual coroutine_handle<> execute() = 0; // 定义协程函数execute
    const string& name() const { return name_; }
    const vector<string>& deps() const { return deps_; }
private:
    string name_;
    vector<string> deps_;
};

// 定义moduleA类,继承自module
class moduleA : public module {
public:
    moduleA(string name, vector<string> deps): module(name, deps) {}
    coroutine_handle<> execute() override {
        // 返回一个协程对象,使用co_yield表示协程挂起
        co_return co_await suspend_always{};
    }
};

// 定义moduleB类,继承自module
class moduleB : public module {
public:
    moduleB(string name, vector<string> deps): module(name, deps) {}
    coroutine_handle<> execute() override {
        // 返回一个协程对象,使用co_yield表示协程挂起
        co_return co_await suspend_always{};
    }
};

// 定义executor类,用于处理调度逻辑
class executor {
public:
    void add_module(module* mod) {
        modules_[mod->name()] = mod;
    }
    void execute_all() {
        // 创建协程对象,并将其挂起
        coroutine_handle<> coro = do_execute_all({});
        coro.resume();
    }
private:
    // do_execute_all为递归函数,用于实现DAG的调度逻辑
    coroutine_handle<> do_execute_all(vector<string> deps) {
        // 如果deps为空,则表示当前的依赖已经全部满足,可以开始执行模块
        if (deps.empty()) {
            co_return; // 直接返回,结束当前协程
        }
        // 遍历所有的模块,查找deps中需要的模块
        for (auto& pair : modules_) {
            auto& mod = pair.second;
            // 如果该模块的依赖包含在deps中,则执行该模块,并将该模块的名称添加到已满足的依赖列表中
            if (all_of(mod->deps().begin(), mod->deps().end(), [&](const string& dep) {
                return find(deps.begin(), deps.end(), dep) != deps.end();
            })) {
                cout << "Executing module " << mod->name() << endl;
                co_await mod->execute(); // 执行模块的协程函数
                deps.push_back(mod->name()); // 将该模块的名称添加到已满足的依赖列表中
                co_yield; // 挂起协程,等待下一次调用
            }
        }
        // 递归调用do_execute_all函数,处理新的依赖列表
        co_await do_execute_all(deps);
    }

    unordered_map<string, module*> modules_;
};

int main() {
   // 创建moduleA和moduleB对象,并指定它们的依赖关系
   moduleA mA1("A1", {});
   moduleA mA2("A2", {"A1"});
   moduleB mB1("B1", {"A1", "A2"});
   moduleA mA3("A3", {"B1"});

   // 创建executor对象,并添加moduleA和moduleB对象
   executor e;
   e.add_module(&mA1);
   e.add_module(&mA2);
   e.add_module(&mB1);
   e.add_module(&mA3);

   // 执行所有modules的操作
   e.execute_all();

   return 0;
}