Pioneer
JobQueue.h
Go to the documentation of this file.
1 // Copyright © 2008-2023 Pioneer Developers. See AUTHORS.txt for details
2 // Licensed under the terms of the GPL v3. See licenses/GPL-3.txt
3 
4 #ifndef JOBQUEUE_H
5 #define JOBQUEUE_H
6 
7 #include "SDL_thread.h"
8 #include "core/TaskGraph.h"
9 #include <atomic>
10 #include <cassert>
11 #include <deque>
12 #include <set>
13 #include <string>
14 #include <vector>
15 
16 static const uint32_t MAX_THREADS = 64;
17 
18 class JobClient;
19 class JobQueue;
20 
21 // represents a single unit of work that you want done
22 // subclass and implement:
23 //
24 // OnRun: called from worker thread, and does the actual stuff you want done.
25 // store all your data in the object itself.
26 //
27 // OnFinish: called from the main thread once the worker completes the job.
28 // this is where you deliver the results from the worker
29 //
30 // OnCancel: optional. called from the main thread to tell the job that its
31 // results are not wanted. it should arrange for OnRun to return
32 // as quickly as possible. OnFinish will not be called for the job
33 class Job {
34 public:
35  // This is the RAII handle for a queued Job. A job is cancelled when the
36  // Job::Handle is destroyed. There is at most one Job::Handle for each Job
37  // (non-queued Jobs have no handle). Job::Handle is not copyable only
38  // moveable.
39  class Handle {
40  public:
41  Handle() :
42  m_id(++s_nextId),
43  m_job(nullptr),
44  m_queue(nullptr),
45  m_client(nullptr) {}
46  Handle(Handle &&other);
47  Handle &operator=(Handle &&other);
48  ~Handle();
49 
50  Handle(const Handle &) = delete;
51  Handle &operator=(const Handle &) = delete;
52 
53  bool HasJob() const { return m_job != nullptr; }
54  Job *GetJob() const { return m_job; }
55 
56  bool operator<(const Handle &other) const { return m_id < other.m_id; }
57 
58  private:
59  friend class Job;
60  friend class AsyncJobQueue;
61  friend class SyncJobQueue;
62  friend class TaskGraphJobQueueImpl;
63 
64  Handle(Job *job, JobQueue *queue, JobClient *client);
65  void Unlink();
66 
67  static Uint64 s_nextId;
68 
69  Uint64 m_id;
70  Job *m_job;
71  JobQueue *m_queue;
72  JobClient *m_client;
73  };
74 
75 public:
76  Job() :
77  cancelled(false),
78  m_handle(nullptr) {}
79  virtual ~Job();
80 
81  Job(const Job &) = delete;
82  Job &operator=(const Job &) = delete;
83 
84  virtual void OnRun() = 0;
85  virtual void OnFinish() = 0;
86  virtual void OnCancel() {}
87 
88 private:
89  friend class AsyncJobQueue;
90  friend class SyncJobQueue;
91  friend class JobRunner;
92 
93  // TaskGraph stuff
94  friend class TaskGraph;
95  friend class TaskGraphJobQueueImpl;
96 
97  void UnlinkHandle();
98  const Handle *GetHandle() const { return m_handle; }
99  void SetHandle(Handle *handle) { m_handle.store(handle, std::memory_order_release); }
100  void ClearHandle() { m_handle = nullptr; }
101 
102  std::atomic<bool> cancelled;
103  std::atomic<Handle *> m_handle;
104 };
105 
106 // the queue management class. create one from the main thread, and feed your
107 // jobs do it. it will take care of the rest
108 class JobQueue {
109 public:
110  JobQueue() = default;
111  JobQueue(const JobQueue &) = delete;
112  JobQueue &operator=(const JobQueue &) = delete;
113  virtual ~JobQueue() {}
114 
115  // call from the main thread to add a job to the queue. the job should be
116  // allocated with new. the queue will delete it once its its completed
117  virtual Job::Handle Queue(Job *job, JobClient *client = nullptr) = 0;
118 
119  // Call from the main thread to cancel a job.
120  // The job will not be run if it is not already executing, and OnFinished
121  // will not be called for the job. OnCancel will be called for the job
122  // when it is eventually processed in a FinishJobs() call
123  virtual void Cancel(Job *job) = 0;
124 
125  // call from the main loop. this will call OnFinish for any finished jobs,
126  // and then delete all finished and cancelled jobs. returns the number of
127  // finished jobs (not cancelled)
128  virtual Uint32 FinishJobs() = 0;
129 };
130 
131 class SyncJobQueue : public JobQueue {
132 public:
133  SyncJobQueue() = default;
134  virtual ~SyncJobQueue();
135 
136  virtual Job::Handle Queue(Job *job, JobClient *client = nullptr) override;
137  virtual void Cancel(Job *job) override;
138  virtual Uint32 FinishJobs() override;
139 
140  Uint32 RunJobs(Uint32 count = 1);
141 
142 private:
143  std::deque<Job *> m_queue;
144  std::deque<Job *> m_finished;
145 };
146 
147 // JobClient is an abstraction to allow transparent management of job handles
148 class JobClient {
149 public:
150  virtual void Order(Job *job) = 0;
151  virtual void RemoveJob(Job::Handle *handle) = 0;
152  virtual ~JobClient() {}
153 };
154 
155 // JobSet provides an interface for "fire and forget" jobs - call Order with your job,
156 // and JobSet will keep the handle alive until the job has finished.
157 class JobSet : public JobClient {
158 public:
159  JobSet(JobQueue *queue) :
160  m_queue(queue) {}
161  JobSet(JobSet &&other) :
162  m_queue(other.m_queue),
163  m_jobs(std::move(other.m_jobs)) { other.m_queue = nullptr; }
165  {
166  m_queue = other.m_queue;
167  m_jobs = std::move(other.m_jobs);
168  other.m_queue = nullptr;
169  return *this;
170  }
171 
172  JobSet(const JobSet &) = delete;
173  JobSet &operator=(const JobSet &other) = delete;
174 
175  virtual void Order(Job *job)
176  {
177  auto x = m_jobs.insert(m_queue->Queue(job, this));
178  (void)x; // suppress unused variable warning
179  assert(x.second);
180  }
181  virtual void RemoveJob(Job::Handle *handle) { m_jobs.erase(*handle); }
182 
183  bool IsEmpty() const { return m_jobs.empty(); }
184 
185 private:
186  JobQueue *m_queue;
187  std::set<Job::Handle> m_jobs;
188 };
189 
190 #endif
Definition: JobQueue.h:148
virtual void Order(Job *job)=0
virtual ~JobClient()
Definition: JobQueue.h:152
virtual void RemoveJob(Job::Handle *handle)=0
Definition: JobQueue.h:108
JobQueue(const JobQueue &)=delete
virtual Uint32 FinishJobs()=0
JobQueue & operator=(const JobQueue &)=delete
virtual Job::Handle Queue(Job *job, JobClient *client=nullptr)=0
JobQueue()=default
virtual void Cancel(Job *job)=0
virtual ~JobQueue()
Definition: JobQueue.h:113
Definition: JobQueue.h:157
JobSet & operator=(const JobSet &other)=delete
bool IsEmpty() const
Definition: JobQueue.h:183
virtual void RemoveJob(Job::Handle *handle)
Definition: JobQueue.h:181
JobSet(JobQueue *queue)
Definition: JobQueue.h:159
virtual void Order(Job *job)
Definition: JobQueue.h:175
JobSet & operator=(JobSet &&other)
Definition: JobQueue.h:164
JobSet(JobSet &&other)
Definition: JobQueue.h:161
JobSet(const JobSet &)=delete
Definition: JobQueue.h:39
Handle & operator=(const Handle &)=delete
Job * GetJob() const
Definition: JobQueue.h:54
Handle()
Definition: JobQueue.h:41
bool operator<(const Handle &other) const
Definition: JobQueue.h:56
Handle & operator=(Handle &&other)
Definition: JobQueue.cpp:64
friend class AsyncJobQueue
Definition: JobQueue.h:60
Handle(const Handle &)=delete
~Handle()
Definition: JobQueue.cpp:83
bool HasJob() const
Definition: JobQueue.h:53
Definition: JobQueue.h:33
virtual ~Job()
Definition: JobQueue.cpp:16
friend class JobRunner
Definition: JobQueue.h:91
Job & operator=(const Job &)=delete
Job()
Definition: JobQueue.h:76
virtual void OnFinish()=0
Job(const Job &)=delete
friend class AsyncJobQueue
Definition: JobQueue.h:89
virtual void OnRun()=0
virtual void OnCancel()
Definition: JobQueue.h:86
Definition: JobQueue.h:131
Uint32 RunJobs(Uint32 count=1)
Definition: JobQueue.cpp:159
virtual ~SyncJobQueue()
Definition: JobQueue.cpp:94
virtual Job::Handle Queue(Job *job, JobClient *client=nullptr) override
Definition: JobQueue.cpp:103
SyncJobQueue()=default
virtual void Cancel(Job *job) override
Definition: JobQueue.cpp:132
virtual Uint32 FinishJobs() override
Definition: JobQueue.cpp:111
Definition: TaskGraph.cpp:26
Definition: TaskGraph.h:139