Pioneer
TaskGraph.h
Go to the documentation of this file.
1 // Copyright © 2008-2023 Pioneer Developers. See AUTHORS.txt for details
2 // Licensed under the terms of the GPL v3. See licenses/GPL-3.txt
3 
4 #pragma once
5 
6 #include <atomic>
7 #include <thread>
8 #include <vector>
9 
10 #include "core/Semaphore.h"
11 
12 struct TaskRange {
13  uint32_t begin;
14  uint32_t end;
15 };
16 
17 class AsyncTaskQueueImpl;
18 class AsyncJobQueueImpl;
19 
20 class JobQueue;
22 class TaskGraph;
23 
24 // CompleteNotifier is a simple atomic helper class to track how many tasks
25 // are running vs complete
27 public:
29  m_dependants(0) {}
30 
31  std::atomic<uint32_t> m_dependants;
32 
33  bool IsComplete() { return m_dependants.load(std::memory_order_relaxed) == 0; }
34 };
35 
36 // A Task is the building block of the TaskGraph system.
37 // It represents a single unit of work to be done in parallel fashion.
38 // Tasks are associated with a numeric range of elements to operate on;
39 // if you have a large number of items to process, create multiple tasks
40 // responsible for a subrange of the overall workload.
41 //
42 // Tasks are managed by raw pointers and should not be deleted by
43 // user code - the owning TaskSet or TaskGraph will delete the task
44 // when it is complete.
45 //
46 // Subclass and implement these methods:
47 // OnExecute: responsible for carrying out the actual work done by the task,
48 // runs on a worker thread.
49 //
50 // OnComplete: used to synchronize reporting of task results, called by the
51 // task owner at a synchronization point on the task owner's thread.
52 class Task {
53 public:
54  Task(TaskRange range = {}) :
55  m_owner(nullptr),
56  m_range(range) {}
57  virtual ~Task() = default;
58 
59  // Runs on a worker thread. Do all work in the task here.
60  virtual void OnExecute(TaskRange range) = 0;
61 
62  // Run by the task owner on the owning thread when the task has finished.
63  // If the task has no owner, this function will not be called.
64  virtual void OnComplete(){};
65 
66  // Sets the task owner (responsible for calling task completion callbacks)
67  void SetOwner(CompleteNotifier *);
68 
69 private:
70  friend class TaskGraph;
71  CompleteNotifier *m_owner;
72  TaskRange m_range;
73 };
74 
75 // Helper task define to enable easy creation with lambdas.
76 template <typename Function>
77 class LambdaTask : public Task {
78 public:
79  LambdaTask(TaskRange r, Function &&lambda) :
80  Task(r),
81  m_lambda(lambda) {}
82 
83  void OnExecute(TaskRange range) override { m_lambda(range); }
84  void OnComplete() override {}
85 
86 private:
87  Function m_lambda;
88 };
89 
90 // Represents a group of related tasks and is responsible for handling
91 // post-task operations on the main thread.
92 class TaskSet : public CompleteNotifier {
93 public:
94  // A Handle is the runtime interface to a currently executing TaskSet.
95  // If you do not call the (blocking) TaskGraph->WaitForTaskSet(handle);
96  // you will need to check for the handle to be complete and manually
97  // trigger execution of the task completion callbacks via
98  // TaskGraph->CompleteTaskSet();
99  struct Handle {
100  Handle(const Handle &) = delete;
101  Handle &operator=(const Handle &) = delete;
102  Handle(Handle &&) = default;
103  Handle &operator=(Handle &&) = default;
104  bool IsComplete() { return !m_set || m_set->IsComplete(); }
105 
106  private:
107  friend class TaskGraph;
108  Handle(TaskSet *set) :
109  m_set(set) {}
110 
111  TaskSet *m_set;
112  };
113 
115  m_tasks{},
116  m_executing(false) {}
117 
118  // Add an individual task to this TaskSet.
119  // This operation will fail if the task set is currently executing.
120  bool AddTask(Task *task);
121 
122  // Adds a lambda task to this TaskSet.
123  template <typename Function>
124  bool AddTaskLambda(TaskRange range, Function &&fn)
125  {
126  return AddTask(new LambdaTask<Function>(range, std::move(fn)));
127  }
128 
129  bool IsExecuting() { return m_executing; }
130 
131 private:
132  friend class TaskGraph;
133  std::vector<Task *> m_tasks;
134  bool m_executing;
135 };
136 
137 // The global task orchestrator.
138 // Responsible for managing threads and executing tasks
139 class TaskGraph {
140 public:
141  TaskGraph();
142  ~TaskGraph();
143 
144  // Set the total number of worker threads
145  void SetWorkerThreads(uint32_t numThreads);
146  uint32_t GetNumWorkerThreads() const;
147 
148  // Queues all tasks in a TaskSet for execution. The TaskGraph now owns
149  // the underlying TaskSet and is responsible for deletion.
151  // Queues a single task without a TaskSet for execution
152  void QueueTask(Task *task);
153 
154  // Queues all tasks in a task set to be executed on the main thread only
156  // Queues a single task to be executed on the main thread only
157  void QueueTaskPinned(Task *task);
158 
159  // Wait for a queued task set to complete and run task completion callbacks.
160  // This will execute tasks on the calling thread until the given TaskSet
161  // handle has completed all queued tasks.
162  void WaitForTaskSet(TaskSet::Handle &set);
163 
164  // Runs completion callbacks for a TaskSet and destroys the underlying
165  // TaskSet object when completed.
166  // Returns false if the task set was not completed.
167  bool CompleteTaskSet(TaskSet::Handle &set);
168 
169  // Run currently available tasks pinned to the main thread
170  void RunPinnedTasks();
171 
172  // Return the JobQueue interface object for this task graph.
174 
175  static uint32_t GetThreadNum();
176 
177 private:
178  friend class TaskGraphJobQueueImpl;
179  struct ThreadData {
180  std::thread *threadHandle;
181  uint32_t threadNum;
182  TaskGraph *graph;
183  bool isJobThread;
184 
185  void RunThread();
186  void WaitForTasks();
187  };
188 
189  static thread_local ThreadData *tl_threadData;
190 
191  bool TryRunTask(ThreadData *thread, bool allowJobs = true);
192  void ExecTask(Task *task);
193  bool HasTasks(ThreadData *thread);
194  static ThreadData *GetThreadData();
195 
196  void WakeForNewTasks();
197  void WakeForFinishedTasks();
198 
199  void WaitForFinishedTask();
200 
201  std::vector<ThreadData *> m_threads;
202 
203  // queue for short-lived high-priority tasks
204  AsyncTaskQueueImpl *m_taskQueue;
205  // queue of tasks to run on the main thread
206  AsyncTaskQueueImpl *m_pinnedTasks;
207 
208  // implementation of the JobQueue interface for backwards compat
209  // with the old JobQueue system
210  TaskGraphJobQueueImpl *m_jobHandlerImpl;
211 
212  // queue for long-lived low-priority background jobs
213  AsyncJobQueueImpl *m_jobQueue;
214  AsyncJobQueueImpl *m_jobFinishedQueue;
215 
216  std::atomic<bool> m_isRunning;
217  std::atomic<uint32_t> m_numAliveThreads;
218 
219  Semaphore m_newTasksSemaphore;
220  Semaphore m_finishedTasksSemaphore;
221 };
Definition: TaskGraph.cpp:21
Definition: TaskGraph.cpp:20
Definition: TaskGraph.h:26
std::atomic< uint32_t > m_dependants
Definition: TaskGraph.h:31
bool IsComplete()
Definition: TaskGraph.h:33
CompleteNotifier()
Definition: TaskGraph.h:28
Definition: JobQueue.h:108
Definition: TaskGraph.h:77
LambdaTask(TaskRange r, Function &&lambda)
Definition: TaskGraph.h:79
void OnComplete() override
Definition: TaskGraph.h:84
void OnExecute(TaskRange range) override
Definition: TaskGraph.h:83
Definition: Semaphore.h:16
Definition: TaskGraph.cpp:26
Definition: TaskGraph.h:139
TaskSet::Handle QueueTaskSetPinned(TaskSet *set)
Definition: TaskGraph.cpp:220
TaskGraph()
Definition: TaskGraph.cpp:100
JobQueue * GetJobQueue()
Definition: TaskGraph.cpp:315
static uint32_t GetThreadNum()
Definition: TaskGraph.cpp:320
void QueueTaskPinned(Task *task)
Definition: TaskGraph.cpp:244
void QueueTask(Task *task)
Definition: TaskGraph.cpp:212
bool CompleteTaskSet(TaskSet::Handle &set)
Definition: TaskGraph.cpp:281
TaskSet::Handle QueueTaskSet(TaskSet *set)
Definition: TaskGraph.cpp:200
void RunPinnedTasks()
Definition: TaskGraph.cpp:299
uint32_t GetNumWorkerThreads() const
Definition: TaskGraph.cpp:168
void WaitForTaskSet(TaskSet::Handle &set)
Definition: TaskGraph.cpp:252
~TaskGraph()
Definition: TaskGraph.cpp:114
void SetWorkerThreads(uint32_t numThreads)
Definition: TaskGraph.cpp:174
Definition: TaskGraph.h:92
bool AddTask(Task *task)
Definition: TaskGraph.cpp:88
bool AddTaskLambda(TaskRange range, Function &&fn)
Definition: TaskGraph.h:124
TaskSet()
Definition: TaskGraph.h:114
bool IsExecuting()
Definition: TaskGraph.h:129
Definition: TaskGraph.h:52
virtual void OnComplete()
Definition: TaskGraph.h:64
void SetOwner(CompleteNotifier *)
Definition: TaskGraph.cpp:78
virtual ~Task()=default
virtual void OnExecute(TaskRange range)=0
Task(TaskRange range={})
Definition: TaskGraph.h:54
Definition: TaskGraph.h:12
uint32_t begin
Definition: TaskGraph.h:13
uint32_t end
Definition: TaskGraph.h:14
Definition: TaskGraph.h:99
Handle & operator=(const Handle &)=delete
Handle(Handle &&)=default
Handle(const Handle &)=delete
bool IsComplete()
Definition: TaskGraph.h:104
Handle & operator=(Handle &&)=default