Skip to content

Pipeline API

Reference for Pipeline, the high-level runtime interface for building and executing DAG image workflows.

Class definition

cpp
namespace mini_image_pipe {

class Pipeline {
public:
    explicit Pipeline(const PipelineConfig& config = PipelineConfig());
    ~Pipeline();

    int addOperator(const std::string& name, OperatorPtr op);
    bool connect(int from, int to);

    void setInput(int nodeId, void* data, int width, int height, int channels);
    void* getOutput(int nodeId);

    cudaError_t execute();
    cudaError_t executeBatch(const std::vector<ImageBuffer>& inputs,
                             std::vector<BatchOutput>& outputs);

    TaskGraph& getTaskGraph();
    const TaskGraph& getTaskGraph() const;
    DAGScheduler& getScheduler();

    void reset();
};

}

PipelineConfig

cpp
struct PipelineConfig {
    int numStreams = 4;
    size_t pinnedPoolSize = 64 * 1024 * 1024;
    bool enableProfiling = false;
    int maxBatchSize = 8;
    bool enableCudaGraphs = false;
    bool preferAsyncAllocator = false;
};
OptionTypeDefaultDescription
numStreamsint4CUDA stream count used by scheduler
pinnedPoolSizesize_t64MBInitial host pinned pool target
enableProfilingboolfalseEnables profiling-related runtime hooks
maxBatchSizeint8Maximum batch size accepted by executeBatch()
enableCudaGraphsboolfalseEnables CUDA Graph capture/replay when possible
preferAsyncAllocatorboolfalsePrefers stream-ordered async device allocator mode

Common usage patterns

Single-path pipeline

cpp
Pipeline pipeline;
auto blur = std::make_shared<GaussianBlurOperator>(GaussianKernelSize::KERNEL_5x5);
int n = pipeline.addOperator("Blur", blur);

pipeline.setInput(n, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n);

Multi-stage DAG pipeline

cpp
PipelineConfig config;
config.numStreams = 4;
Pipeline pipeline(config);

int n1 = pipeline.addOperator("Resize", resizeOp);
int n2 = pipeline.addOperator("Blur", blurOp);
int n3 = pipeline.addOperator("Sobel", sobelOp);

pipeline.connect(n1, n2);
pipeline.connect(n2, n3);

pipeline.setInput(n1, d_input, width, height, channels);
cudaError_t err = pipeline.execute();
void* out = pipeline.getOutput(n3);

Batch execution

cpp
std::vector<ImageBuffer> inputs = {
    {frame0, width, height, channels, width * channels, sizeof(uint8_t), 1,
     static_cast<size_t>(width) * height * channels, true, false},
    {frame1, width, height, channels, width * channels, sizeof(uint8_t), 1,
     static_cast<size_t>(width) * height * channels, true, false},
    {frame2, width, height, channels, width * channels, sizeof(uint8_t), 1,
     static_cast<size_t>(width) * height * channels, true, false},
};
std::vector<BatchOutput> outputs;

cudaError_t err = pipeline.executeBatch(inputs, outputs);
for (const auto& output : outputs) {
    std::cout << "Sink node " << output.nodeId << " produced "
              << output.frames.size() << " frames" << std::endl;
}

Error handling

cpp
cudaError_t err = pipeline.execute();
if (err != cudaSuccess) {
    std::cerr << "Pipeline failed: " << cudaGetErrorString(err) << std::endl;

    for (const auto& task : pipeline.getTaskGraph().getTasks()) {
        if (task.state.load() == TaskState::FAILED) {
            std::cerr << "Failed task: " << task.name << std::endl;
        }
    }
}

Threading and lifecycle notes

  • Pipeline instances are not designed for concurrent mutation from multiple threads.
  • MemoryManager is singleton-based and thread-safe for allocation paths.
  • reset() clears task states for re-execution but preserves graph structure.

Released under the MIT License.