SkePU(integratedwithStarPU)  0.8.1
 All Classes Namespaces Files Functions Enumerations Friends Macros Groups Pages
thread_pool.h
1 #ifndef THREAD_POOL
2 #define THREAD_POOL
3 
4 
5 //#ifdef _WIN32
7 //#else
8 #include <pthread.h>
9 #include <semaphore.h>
10 #include "thread_management.h"
11 //#endif
12 
13 
14 #include <iostream>
15 #include <stdlib.h>
16 #include <vector>
17 
18 #include <map>
19 
20 
21 
22 //using namespace std;
23 
24 
25 
26 namespace skepu
27 {
28 
29 typedef void *(*ThreadFunc)(void *);
30 
31 struct WorkerThread
32 {
33  ThreadFunc job_func;
34 // void *(*job_func)(void *); /* function to call */
35  void *job_arg; /* its argument */
36 
37  unsigned virtual executeThis()
38  {
39  job_func(job_arg);
40  return 0;
41  }
42 
43  WorkerThread(ThreadFunc job, void *arg): job_func(job), job_arg(arg)
44  {
45 
46  }
47 
48  ~WorkerThread()
49  {
50  job_func = NULL;
51  job_arg = NULL;
52  }
53 };
54 
61 {
62 public:
63  ThreadPool();
64  ThreadPool(int maxThreadsTemp);
65  virtual ~ThreadPool();
66 
67  void destroyPool(int maxPollMilliSecs=100);
68  void finishAll(int maxPollMilliSecs=100);
69 
70  bool assignWork(WorkerThread *worker);
71  bool fetchWork(WorkerThread **worker);
72 
73  void initializeThreads();
74 
75  static void *threadExecute(void *param);
76 
77  static pthread_mutex_t mutexSync;
78  static pthread_mutex_t mutexWorkCompletion;
79 
80 
81 private:
82  int maxThreads;
83 
84  pthread_cond_t condCrit;
85  sem_t availableWork;
86  sem_t availableThreads;
87 
88  std::vector<WorkerThread *> workerQueue;
89 
90  int topIndex;
91  int bottomIndex;
92 
93  int incompleteWork;
94 
95 
96  int queueSize;
97 
98 };
99 
100 
101 
102 pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
103 pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
104 
105 
106 ThreadPool::ThreadPool()
107 {
108 //
109 // #ifdef SKEPU_CUDA
110 // {
111 // int numDevices = 0;
112 // cudaError_t err;
113 // err = cudaGetDeviceCount(&numDevices);
114 //
115 // ThreadPool(numDevices);
116 // }
117 //
118 // #else
119 
120  ThreadPool(2);
121 
122 // #endif
123 }
124 
125 ThreadPool::ThreadPool(int maxThreads)
126 {
127  if (maxThreads < 1) maxThreads=1;
128 
129  //mutexSync = PTHREAD_MUTEX_INITIALIZER;
130  //mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
131 
132  pthread_mutex_lock(&mutexSync);
133  this->maxThreads = maxThreads;
134  this->queueSize = maxThreads;
135  //workerQueue = new WorkerThread *[maxThreads];
136  workerQueue.resize(maxThreads, NULL);
137  topIndex = 0;
138  bottomIndex = 0;
139  incompleteWork = 0;
140  sem_init(&availableWork, 0, 0);
141  sem_init(&availableThreads, 0, queueSize);
142  pthread_mutex_unlock(&mutexSync);
143 }
144 
145 void ThreadPool::initializeThreads()
146 {
147  for(int i = 0; i<maxThreads; ++i)
148  {
149  pthread_t tempThread;
150  pthread_create(&tempThread, NULL, &ThreadPool::threadExecute, (void *) this );
151  //threadIdVec[i] = tempThread;
152  }
153 
154 }
155 
156 ThreadPool::~ThreadPool()
157 {
158  workerQueue.clear();
159 }
160 
161 
162 void ThreadPool::finishAll(int maxPollMilliSecs)
163 {
164  while( incompleteWork>0 )
165  {
166  //cout << "Work is still incomplete=" << incompleteWork << endl;
167  usleep(maxPollMilliSecs);
168  }
169 }
170 
171 
172 void ThreadPool::destroyPool(int maxPollMilliSecs)
173 {
174  while( incompleteWork>0 )
175  {
176  //cout << "Work is still incomplete=" << incompleteWork << endl;
177  usleep(maxPollMilliSecs);
178  }
179  //std::cout << "All Done!! Wow! That was a lot of work!" << endl;
180  sem_destroy(&availableWork);
181  sem_destroy(&availableThreads);
182  pthread_mutex_destroy(&mutexSync);
183  pthread_mutex_destroy(&mutexWorkCompletion);
184 
185 }
186 
187 
188 bool ThreadPool::assignWork(WorkerThread *workerThread)
189 {
190  // std::cerr << "STOP INSIDE** \n";
191  pthread_mutex_lock(&mutexWorkCompletion);
192  incompleteWork++;
193  //cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
194  pthread_mutex_unlock(&mutexWorkCompletion);
195 
196  sem_wait(&availableThreads);
197 
198  pthread_mutex_lock(&mutexSync);
199  //workerVec[topIndex] = workerThread;
200  workerQueue[topIndex] = workerThread;
201  // std::cout << "Assigning Worker[] Address:[" << workerThread << "] to Queue index [" << topIndex << "]\n";
202  if(queueSize !=1 )
203  topIndex = (topIndex+1) % (queueSize);
204  sem_post(&availableWork);
205  pthread_mutex_unlock(&mutexSync);
206  return true;
207 }
208 
209 bool ThreadPool::fetchWork(WorkerThread **workerArg)
210 {
211  sem_wait(&availableWork);
212 
213  pthread_mutex_lock(&mutexSync);
214  WorkerThread * workerThread = workerQueue[bottomIndex];
215  workerQueue[bottomIndex] = NULL;
216  *workerArg = workerThread;
217  if(queueSize !=1 )
218  bottomIndex = (bottomIndex+1) % (queueSize);
219  sem_post(&availableThreads);
220  pthread_mutex_unlock(&mutexSync);
221  return true;
222 }
223 
224 void *ThreadPool::threadExecute(void *param)
225 {
226  WorkerThread *worker = NULL;
227 
228  while(((ThreadPool *)param)->fetchWork(&worker))
229  {
230  if(worker)
231  {
232  worker->executeThis();
233  delete worker;
234  worker = NULL;
235  }
236 
237  pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
238  //cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
239  ((ThreadPool *)param)->incompleteWork--;
240  pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
241  }
242  return 0;
243 }
244 
245 }
246 
247 
248 #endif
249 
to enable thread pooling while using multiple CUDA devices. ThreadPool class manages all the ThreadPo...
Definition: thread_pool.h:60
Contains classes that help with thread management. Currently only Pthreads.