Skip to content

Pipeline API

Pipeline 是构建并执行 DAG 图像处理流程的高层运行时接口。

类定义

cpp
namespace mini_image_pipe {

class Pipeline {
public:
    explicit Pipeline(const PipelineConfig& config = PipelineConfig());
    ~Pipeline();

    int addOperator(const std::string& name, OperatorPtr op);
    bool connect(int from, int to);

    void setInput(int nodeId, void* data, int width, int height, int channels);
    void* getOutput(int nodeId);

    cudaError_t execute();
    cudaError_t executeBatch(const std::vector<ImageBuffer>& inputs,
                             std::vector<BatchOutput>& outputs);

    TaskGraph& getTaskGraph();
    const TaskGraph& getTaskGraph() const;
    DAGScheduler& getScheduler();

    void reset();
};

}

PipelineConfig

cpp
struct PipelineConfig {
    int numStreams = 4;
    size_t pinnedPoolSize = 64 * 1024 * 1024;
    bool enableProfiling = false;
    int maxBatchSize = 8;
    bool enableCudaGraphs = false;
    bool preferAsyncAllocator = false;
};
选项类型默认值说明
numStreamsint4调度器使用的 CUDA 流数量
pinnedPoolSizesize_t64MBHost pinned 内存池目标规模
enableProfilingboolfalse启用运行时性能分析相关钩子
maxBatchSizeint8executeBatch() 允许的最大批量
enableCudaGraphsboolfalse在可行条件下启用 CUDA Graph 捕获/回放
preferAsyncAllocatorboolfalse优先启用流有序异步设备分配模式

常见使用模式

单路径流水线

cpp
Pipeline pipeline;
auto blur = std::make_shared<GaussianBlurOperator>(GaussianKernelSize::KERNEL_5x5);
int n = pipeline.addOperator("Blur", blur);

pipeline.setInput(n, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n);

多阶段 DAG 流水线

cpp
PipelineConfig config;
config.numStreams = 4;
Pipeline pipeline(config);

int n1 = pipeline.addOperator("Resize", resizeOp);
int n2 = pipeline.addOperator("Blur", blurOp);
int n3 = pipeline.addOperator("Sobel", sobelOp);

pipeline.connect(n1, n2);
pipeline.connect(n2, n3);

pipeline.setInput(n1, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n3);

批量执行

cpp
std::vector<ImageBuffer> inputs = {
    {frame0, width, height, channels, width * channels, sizeof(uint8_t), 1,
     static_cast<size_t>(width) * height * channels, true, false},
    {frame1, width, height, channels, width * channels, sizeof(uint8_t), 1,
     static_cast<size_t>(width) * height * channels, true, false},
    {frame2, width, height, channels, width * channels, sizeof(uint8_t), 1,
     static_cast<size_t>(width) * height * channels, true, false},
};
std::vector<BatchOutput> outputs;

cudaError_t err = pipeline.executeBatch(inputs, outputs);
for (const auto& output : outputs) {
    std::cout << "Sink node " << output.nodeId << " produced "
              << output.frames.size() << " frames" << std::endl;
}

错误处理

cpp
cudaError_t err = pipeline.execute();
if (err != cudaSuccess) {
    std::cerr << "Pipeline 执行失败:" << cudaGetErrorString(err) << std::endl;

    for (const auto& task : pipeline.getTaskGraph().getTasks()) {
        if (task.state.load() == TaskState::FAILED) {
            std::cerr << "失败任务:" << task.name << std::endl;
        }
    }
}

线程与生命周期说明

  • Pipeline 不适合被多个线程并发修改。
  • MemoryManager 基于单例,分配路径线程安全。
  • reset() 会清空任务状态,保留 DAG 结构用于再次执行。

基于 MIT 许可证发布