Pipeline API
Pipeline 是构建并执行 DAG 图像处理流程的高层运行时接口。
类定义
cpp
namespace mini_image_pipe {
class Pipeline {
public:
explicit Pipeline(const PipelineConfig& config = PipelineConfig());
~Pipeline();
int addOperator(const std::string& name, OperatorPtr op);
bool connect(int from, int to);
void setInput(int nodeId, void* data, int width, int height, int channels);
void* getOutput(int nodeId);
cudaError_t execute();
cudaError_t executeBatch(const std::vector<ImageBuffer>& inputs,
std::vector<BatchOutput>& outputs);
TaskGraph& getTaskGraph();
const TaskGraph& getTaskGraph() const;
DAGScheduler& getScheduler();
void reset();
};
}PipelineConfig
cpp
struct PipelineConfig {
int numStreams = 4;
size_t pinnedPoolSize = 64 * 1024 * 1024;
bool enableProfiling = false;
int maxBatchSize = 8;
bool enableCudaGraphs = false;
bool preferAsyncAllocator = false;
};| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
numStreams | int | 4 | 调度器使用的 CUDA 流数量 |
pinnedPoolSize | size_t | 64MB | Host pinned 内存池目标规模 |
enableProfiling | bool | false | 启用运行时性能分析相关钩子 |
maxBatchSize | int | 8 | executeBatch() 允许的最大批量 |
enableCudaGraphs | bool | false | 在可行条件下启用 CUDA Graph 捕获/回放 |
preferAsyncAllocator | bool | false | 优先启用流有序异步设备分配模式 |
常见使用模式
单路径流水线
cpp
Pipeline pipeline;
auto blur = std::make_shared<GaussianBlurOperator>(GaussianKernelSize::KERNEL_5x5);
int n = pipeline.addOperator("Blur", blur);
pipeline.setInput(n, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n);多阶段 DAG 流水线
cpp
PipelineConfig config;
config.numStreams = 4;
Pipeline pipeline(config);
int n1 = pipeline.addOperator("Resize", resizeOp);
int n2 = pipeline.addOperator("Blur", blurOp);
int n3 = pipeline.addOperator("Sobel", sobelOp);
pipeline.connect(n1, n2);
pipeline.connect(n2, n3);
pipeline.setInput(n1, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n3);批量执行
cpp
std::vector<ImageBuffer> inputs = {
{frame0, width, height, channels, width * channels, sizeof(uint8_t), 1,
static_cast<size_t>(width) * height * channels, true, false},
{frame1, width, height, channels, width * channels, sizeof(uint8_t), 1,
static_cast<size_t>(width) * height * channels, true, false},
{frame2, width, height, channels, width * channels, sizeof(uint8_t), 1,
static_cast<size_t>(width) * height * channels, true, false},
};
std::vector<BatchOutput> outputs;
cudaError_t err = pipeline.executeBatch(inputs, outputs);
for (const auto& output : outputs) {
std::cout << "Sink node " << output.nodeId << " produced "
<< output.frames.size() << " frames" << std::endl;
}错误处理
cpp
cudaError_t err = pipeline.execute();
if (err != cudaSuccess) {
std::cerr << "Pipeline 执行失败:" << cudaGetErrorString(err) << std::endl;
for (const auto& task : pipeline.getTaskGraph().getTasks()) {
if (task.state.load() == TaskState::FAILED) {
std::cerr << "失败任务:" << task.name << std::endl;
}
}
}线程与生命周期说明
Pipeline不适合被多个线程并发修改。MemoryManager基于单例,分配路径线程安全。reset()会清空任务状态,保留 DAG 结构用于再次执行。