Skip to content

DAG 调度器 API

DAGScheduler 是将 DAG 任务映射到 CUDA 流的执行引擎。

类定义

cpp
class DAGScheduler {
public:
    explicit DAGScheduler(int numStreams = 4);
    ~DAGScheduler();

    cudaError_t execute(TaskGraph& graph);
    void setErrorCallback(std::function<void(int taskId, cudaError_t)> cb);

    int getNumStreams() const;
    int getTaskStream(int taskId) const;
    bool hasSynchronization(int fromTask, int toTask) const;

    void setGraphExecutionEnabled(bool enabled);
    bool isGraphExecutionEnabled() const;
    bool didReplayLastGraph() const;
    bool hasCapturedGraph() const;
};

核心方法

DAGScheduler(int numStreams = 4)

构造调度器并创建其管理的 CUDA 流。

cpp
PipelineConfig config;
config.numStreams = 4;
Pipeline pipeline(config);  // 内部会构造 DAGScheduler(4)

execute(TaskGraph& graph)

按拓扑序执行任务,并对跨流依赖插入同步。

cpp
cudaError_t err = pipeline.getScheduler().execute(pipeline.getTaskGraph());

setErrorCallback(...)

注册任务执行失败回调。

cpp
scheduler.setErrorCallback([](int taskId, cudaError_t err) {
    std::cerr << "任务 " << taskId << " 失败:" << cudaGetErrorString(err) << std::endl;
});

执行模型

流分配策略

  • 源任务按流索引分布。
  • 依赖任务优先选择未被上游任务占用的流。
  • 若所有流都被依赖占用,则复用其中一个依赖流。

依赖同步

当依赖跨流时,调度器通过 CUDA event 实现同步:

cpp
cudaEventRecord(event, producerStream);
cudaStreamWaitEvent(consumerStream, event, 0);

失败传播

当任务 T 失败时:

  1. T 标记为 FAILED
  2. 触发错误回调。
  3. 所有依赖 T 的下游任务递归标记为 FAILED
  4. T 无依赖关系的分支可继续执行。

任务状态

cpp
enum class TaskState {
    PENDING,
    READY,
    RUNNING,
    COMPLETED,
    FAILED
};

CUDA Graph 控制

调度器支持在稳定工作负载下进行图捕获与回放:

cpp
pipeline.getScheduler().setGraphExecutionEnabled(true);

可使用以下状态接口辅助诊断:

  • hasCapturedGraph()
  • didReplayLastGraph()
  • isGraphExecutionEnabled()

调优建议

拓扑类型推荐流数
单算子 / 线性链路1-2
中等 DAG(3-6 算子)2-4
分叉-合并拓扑4
高并行复杂 DAG4-8

建议结合基准页与性能分析工具进行实测调优。

基于 MIT 许可证发布