Pipeline API
Reference for Pipeline, the high-level runtime interface for building and executing DAG image workflows.
Class definition
cpp
namespace mini_image_pipe {
class Pipeline {
public:
explicit Pipeline(const PipelineConfig& config = PipelineConfig());
~Pipeline();
int addOperator(const std::string& name, OperatorPtr op);
bool connect(int from, int to);
void setInput(int nodeId, void* data, int width, int height, int channels);
void* getOutput(int nodeId);
cudaError_t execute();
cudaError_t executeBatch(const std::vector<ImageBuffer>& inputs,
std::vector<BatchOutput>& outputs);
TaskGraph& getTaskGraph();
const TaskGraph& getTaskGraph() const;
DAGScheduler& getScheduler();
void reset();
};
}PipelineConfig
cpp
struct PipelineConfig {
int numStreams = 4;
size_t pinnedPoolSize = 64 * 1024 * 1024;
bool enableProfiling = false;
int maxBatchSize = 8;
bool enableCudaGraphs = false;
bool preferAsyncAllocator = false;
};| Option | Type | Default | Description |
|---|---|---|---|
numStreams | int | 4 | CUDA stream count used by scheduler |
pinnedPoolSize | size_t | 64MB | Initial host pinned pool target |
enableProfiling | bool | false | Enables profiling-related runtime hooks |
maxBatchSize | int | 8 | Maximum batch size accepted by executeBatch() |
enableCudaGraphs | bool | false | Enables CUDA Graph capture/replay when possible |
preferAsyncAllocator | bool | false | Prefers stream-ordered async device allocator mode |
Common usage patterns
Single-path pipeline
cpp
Pipeline pipeline;
auto blur = std::make_shared<GaussianBlurOperator>(GaussianKernelSize::KERNEL_5x5);
int n = pipeline.addOperator("Blur", blur);
pipeline.setInput(n, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n);Multi-stage DAG pipeline
cpp
PipelineConfig config;
config.numStreams = 4;
Pipeline pipeline(config);
int n1 = pipeline.addOperator("Resize", resizeOp);
int n2 = pipeline.addOperator("Blur", blurOp);
int n3 = pipeline.addOperator("Sobel", sobelOp);
pipeline.connect(n1, n2);
pipeline.connect(n2, n3);
pipeline.setInput(n1, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n3);Batch execution
cpp
std::vector<ImageBuffer> inputs = {
{frame0, width, height, channels, width * channels, sizeof(uint8_t), 1,
static_cast<size_t>(width) * height * channels, true, false},
{frame1, width, height, channels, width * channels, sizeof(uint8_t), 1,
static_cast<size_t>(width) * height * channels, true, false},
{frame2, width, height, channels, width * channels, sizeof(uint8_t), 1,
static_cast<size_t>(width) * height * channels, true, false},
};
std::vector<BatchOutput> outputs;
cudaError_t err = pipeline.executeBatch(inputs, outputs);
for (const auto& output : outputs) {
std::cout << "Sink node " << output.nodeId << " produced "
<< output.frames.size() << " frames" << std::endl;
}Error handling
cpp
cudaError_t err = pipeline.execute();
if (err != cudaSuccess) {
std::cerr << "Pipeline failed: " << cudaGetErrorString(err) << std::endl;
for (const auto& task : pipeline.getTaskGraph().getTasks()) {
if (task.state.load() == TaskState::FAILED) {
std::cerr << "Failed task: " << task.name << std::endl;
}
}
}Threading and lifecycle notes
Pipelineinstances are not designed for concurrent mutation from multiple threads.MemoryManageris singleton-based and thread-safe for allocation paths.reset()clears task states for re-execution but preserves graph structure.