Simple DAG Example
This example demonstrates creating and executing a simple Directed Acyclic Graph (DAG) with HTS.
Overview
We'll create a data processing pipeline:
Load Data (CPU) → Preprocess (CPU) → GPU Compute → Postprocess (CPU) → Save Results (CPU)1
Complete Code
cpp
#include <hts/heterogeneous_task_scheduler.hpp>
#include <iostream>
#include <vector>
using namespace hts;
// Simulated data loading function
void load_data(TaskContext& ctx) {
std::cout << "[CPU] Loading data from disk..." << std::endl;
// Simulate I/O operation
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// Simulated preprocessing function
void preprocess_data(TaskContext& ctx) {
std::cout << "[CPU] Preprocessing data (normalization, filtering)..." << std::endl;
// Simulate CPU computation
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
// Simulated GPU computation
void gpu_compute(TaskContext& ctx, cudaStream_t stream) {
std::cout << "[GPU] Running neural network inference..." << std::endl;
// Simulate GPU kernel execution
// In real code: my_kernel<<<blocks, threads, 0, stream>>>(data);
cudaStreamSynchronize(stream);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
// Simulated postprocessing
void postprocess_data(TaskContext& ctx) {
std::cout << "[CPU] Post-processing results..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
// Simulated save function
void save_results(TaskContext& ctx) {
std::cout << "[CPU] Saving results to disk..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
int main() {
std::cout << "=== Simple DAG Example ===" << std::endl;
// Step 1: Create task graph
TaskGraph graph;
TaskBuilder builder(graph);
// Step 2: Create tasks
auto load_task = builder
.create_task("LoadData")
.device(DeviceType::CPU)
.cpu_func(load_data)
.priority(10)
.tag("io")
.build();
auto preprocess_task = builder
.create_task("Preprocess")
.device(DeviceType::CPU)
.cpu_func(preprocess_data)
.priority(8)
.tag("cpu-bound")
.build();
auto gpu_task = builder
.create_task("GPUCompute")
.device(DeviceType::GPU)
.gpu_func(gpu_compute)
.priority(15)
.memory(256 * 1024 * 1024) // 256 MB
.build();
auto postprocess_task = builder
.create_task("Postprocess")
.device(DeviceType::CPU)
.cpu_func(postprocess_data)
.priority(8)
.build();
auto save_task = builder
.create_task("SaveResults")
.device(DeviceType::CPU)
.cpu_func(save_results)
.priority(10)
.tag("io")
.build();
// Step 3: Set up dependencies (create the DAG)
graph.add_dependency(load_task->id(), preprocess_task->id());
graph.add_dependency(preprocess_task->id(), gpu_task->id());
graph.add_dependency(gpu_task->id(), postprocess_task->id());
graph.add_dependency(postprocess_task->id(), save_task->id());
// Step 4: Validate the graph
Error err = graph.validate();
if (!err.ok()) {
std::cerr << "Graph validation failed: " << err.message() << std::endl;
return 1;
}
std::cout << "\nGraph Statistics:" << std::endl;
std::cout << " Tasks: " << graph.num_tasks() << std::endl;
std::cout << " Dependencies: " << graph.num_dependencies() << std::endl;
// Step 5: Create and configure scheduler
Scheduler scheduler;
SchedulerConfig config;
config.cpu_thread_count = 4;
config.gpu_stream_count = 2;
config.enable_profiling = true;
scheduler.configure(config);
// Use GPU priority policy
scheduler.set_policy(std::make_unique<GPUPriorityPolicy>());
// Step 6: Initialize and execute
err = scheduler.init(&graph);
if (!err.ok()) {
std::cerr << "Scheduler initialization failed: " << err.message() << std::endl;
return 1;
}
std::cout << "\nExecuting DAG..." << std::endl;
auto start_time = std::chrono::high_resolution_clock::now();
scheduler.execute();
scheduler.wait_for_completion();
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time
);
// Step 7: Print results
std::cout << "\n=== Execution Complete ===" << std::endl;
std::cout << "Total time: " << duration.count() << " ms" << std::endl;
const auto& stats = scheduler.get_stats();
std::cout << "Tasks scheduled: " << stats.tasks_scheduled << std::endl;
std::cout << "CPU tasks: " << stats.cpu_tasks << std::endl;
std::cout << "GPU tasks: " << stats.gpu_tasks << std::endl;
std::cout << "Parallelism factor: " << stats.parallelism_factor << "x" << std::endl;
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
Expected Output
=== Simple DAG Example ===
Graph Statistics:
Tasks: 5
Dependencies: 4
Executing DAG...
[CPU] Loading data from disk...
[CPU] Preprocessing data (normalization, filtering)...
[GPU] Running neural network inference...
[CPU] Post-processing results...
[CPU] Saving results to disk...
=== Execution Complete ===
Total time: 580 ms
Tasks scheduled: 5
CPU tasks: 4
GPU tasks: 1
Parallelism factor: 1.0x1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Explanation
1. Task Creation
We use TaskBuilder for clean, fluent syntax:
cpp
auto task = builder
.create_task("TaskName")
.device(DeviceType::CPU) // or DeviceType::GPU
.cpu_func(cpu_function) // for CPU tasks
.gpu_func(gpu_function) // for GPUtasks
.priority(10) // relative priority
.build(); // finalize creation1
2
3
4
5
6
7
2
3
4
5
6
7
2. Dependencies
Dependencies form the DAG structure:
cpp
// Task B depends on Task A (A must complete before B starts)
graph.add_dependency(A->id(), B->id());1
2
2
3. Validation
Always validate before execution:
cpp
Error err = graph.validate();
if (!err.ok()) {
std::cerr << "Graph validation failed: " << err.message() << std::endl;
return 1;
}1
2
3
4
5
2
3
4
5
4. Execution
Three-step execution:
cpp
scheduler.init(&graph); // Prepare
scheduler.execute(); // Start (non-blocking)
scheduler.wait_for_completion(); // Wait for done1
2
3
2
3
Building and Running
CMakeLists.txt
cmake
cmake_minimum_required(VERSION 3.18)
project(simple_dag_example)
set(CMAKE_CXX_STANDARD 17)
find_package(hts REQUIRED)
add_executable(simple_dag main.cpp)
target_link_libraries(simple_dag PRIVATE hts_lib)1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
Build Commands
bash
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
make -j$(nproc)
./simple_dag1
2
3
4
2
3
4
Variations
Parallel Branches
Create parallel execution branches:
cpp
auto load = builder.create_task("Load").cpu_func(load_data).build();
// Two parallel preprocessing tasks
auto preprocess1 = builder.create_task("Preprocess1").cpu_func(preprocess_chunk1).build();
auto preprocess2 = builder.create_task("Preprocess2").cpu_func(preprocess_chunk2).build();
auto gpu = builder.create_task("GPU").gpu_func(gpu_compute).build();
// Both preprocessing tasks must complete before GPU
graph.add_dependency(load->id(), preprocess1->id());
graph.add_dependency(load->id(), preprocess2->id());
graph.add_dependency(preprocess1->id(), gpu->id());
graph.add_dependency(preprocess2->id(), gpu->id());
/*
Graph:
Load ──► Preprocess1 ──┐
└► Preprocess2 ──┼► GPU
┘
*/1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
With Retry Policy
Add fault tolerance:
cpp
auto gpu_task = builder
.create_task("GPUCompute")
.device(DeviceType::GPU)
.gpu_func(risky_kernel)
.retry_policy(RetryPolicy{
.max_retries = 3,
.backoff_ms = 100,
.backoff_multiplier = 2.0f
})
.build();1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
Next Steps
- Pipeline Example — More complex pipeline with error handling
- Quick Start Guide — Tutorial
- Task Graph Guide — Deep dive into DAGs