DAG 调度器 API
DAGScheduler 是将 DAG 任务映射到 CUDA 流的执行引擎。
类定义
cpp
class DAGScheduler {
public:
explicit DAGScheduler(int numStreams = 4);
~DAGScheduler();
cudaError_t execute(TaskGraph& graph);
void setErrorCallback(std::function<void(int taskId, cudaError_t)> cb);
int getNumStreams() const;
int getTaskStream(int taskId) const;
bool hasSynchronization(int fromTask, int toTask) const;
void setGraphExecutionEnabled(bool enabled);
bool isGraphExecutionEnabled() const;
bool didReplayLastGraph() const;
bool hasCapturedGraph() const;
};核心方法
DAGScheduler(int numStreams = 4)
构造调度器并创建其管理的 CUDA 流。
cpp
PipelineConfig config;
config.numStreams = 4;
Pipeline pipeline(config); // 内部会构造 DAGScheduler(4)execute(TaskGraph& graph)
按拓扑序执行任务,并对跨流依赖插入同步。
cpp
cudaError_t err = pipeline.getScheduler().execute(pipeline.getTaskGraph());setErrorCallback(...)
注册任务执行失败回调。
cpp
scheduler.setErrorCallback([](int taskId, cudaError_t err) {
std::cerr << "任务 " << taskId << " 失败:" << cudaGetErrorString(err) << std::endl;
});执行模型
流分配策略
- 源任务按流索引分布。
- 依赖任务优先选择未被上游任务占用的流。
- 若所有流都被依赖占用,则复用其中一个依赖流。
依赖同步
当依赖跨流时,调度器通过 CUDA event 实现同步:
cpp
cudaEventRecord(event, producerStream);
cudaStreamWaitEvent(consumerStream, event, 0);失败传播
当任务 T 失败时:
T标记为FAILED。- 触发错误回调。
- 所有依赖
T的下游任务递归标记为FAILED。 - 与
T无依赖关系的分支可继续执行。
任务状态
cpp
enum class TaskState {
PENDING,
READY,
RUNNING,
COMPLETED,
FAILED
};CUDA Graph 控制
调度器支持在稳定工作负载下进行图捕获与回放:
cpp
pipeline.getScheduler().setGraphExecutionEnabled(true);可使用以下状态接口辅助诊断:
hasCapturedGraph()didReplayLastGraph()isGraphExecutionEnabled()
调优建议
| 拓扑类型 | 推荐流数 |
|---|---|
| 单算子 / 线性链路 | 1-2 |
| 中等 DAG(3-6 算子) | 2-4 |
| 分叉-合并拓扑 | 4 |
| 高并行复杂 DAG | 4-8 |
建议结合基准页与性能分析工具进行实测调优。