SkePU(integratedwithStarPU)  0.8.1
 All Classes Namespaces Files Functions Enumerations Friends Macros Groups Pages
mapreduce.h
Go to the documentation of this file.
1 
5 #ifndef MAPREDUCE_H
6 #define MAPREDUCE_H
7 
8 
9 
10 #include <starpu.h>
11 #ifdef SKEPU_OPENCL
12  #include <starpu_opencl.h>
13 #endif
14 
15 
16 
17 #ifdef SKEPU_OPENCL
18 #include <string>
19 #include <vector>
20 #ifdef USE_MAC_OPENCL
21 #include <OpenCL/opencl.h>
22 #else
23 #include <CL/cl.h>
24 #endif
25 #include "src/device_cl.h"
26 #endif
27 
28 #include "src/environment.h"
29 #include "skepu/task.h"
30 #include "skepu/vector.h"
31 #include "src/operator_macros.h"
32 #include "src/exec_plan.h"
33 
34 namespace skepu
35 {
36 
61 template <typename MapFunc, typename ReduceFunc, typename T, skepu::FuncType c_type=UNARY>
62 class MapReduce: public Task
63 {
64 
65 public:
66 
67  MapReduce(MapFunc* mapFunc, ReduceFunc* reduceFunc);
68 
69  MapReduce(MapFunc* mapFunc, ReduceFunc* reduceFunc, Vector<T> *in, T *out);
70 
71  ~MapReduce();
72 
73  void *get_result()
74  {
75  if(isScalarStarPU)
76  {
77  starpu_data_unregister(scalar_result_handle); // unregister as this data can be modified
78  isScalarStarPU = false;
79  }
80 
81  return output;
82  }
83 
84 
85  void setInput(Vector<T> *in)
86  {
87  input = in;
88  }
89 
90  void setOutput(T *out)
91  {
92  if(isScalarStarPU)
93  starpu_data_acquire(scalar_result_handle, STARPU_W);
94 
95  output = out;
96 
97  if(isScalarStarPU)
98  starpu_data_release(scalar_result_handle);
99  }
100 
101  T getOutput()
102  {
103  if(isScalarStarPU)
104  {
105  T _output;
106  starpu_data_acquire(scalar_result_handle, STARPU_R); // acquire as we know that it is just a read access…
107  _output = *output;
108  starpu_data_release(scalar_result_handle); // release acquire before returning control…
109  return _output;
110  }
111  return *output;
112  }
113 
114  void run_async();
115 
116  void setConstant(T constant1) {m_mapFunc->setConstant(constant1);}
117 
118  void finishAll() {m_environment->finishAll();}
119  void setExecPlan(ExecPlan& plan) {m_execPlan = plan;}
120 
121 public:
122  Environment<int>* m_environment;
123  MapFunc* m_mapFunc;
124  ReduceFunc* m_reduceFunc;
125  ExecPlan m_execPlan;
126 
127 #ifdef USE_STARPU_HISTORY_MODEL
128  starpu_perfmodel *mapred1_perf_model;
129 #endif
130 
131 
132  struct starpu_codelet *codelet;
133 
134  starpu_data_handle_t scalar_result_handle;
135 
136  bool isScalarStarPU;
137 
138 public:
139  T operator()(Vector<T>& input);
140 
141  T operator()(Vector<T>& input, int parts);
142 
143 public:
144  static void cpu_func(void *buffers[], void *arg);
145 
146 #ifdef SKEPU_OPENMP
147 public:
148  static void omp_func(void *buffers[], void *arg);
149 #endif
150 
151 #ifdef SKEPU_CUDA
152 public:
153  static void cuda_func(void *buffers[], void *arg);
154 #endif
155 
156 #ifdef SKEPU_OPENCL
157 public:
158  static void opencl_func(void *buffers[], void *arg);
159 
160 public:
161  char kernelNameMap[1024];
162  char kernelNameRed[1024];
163 
164  struct starpu_opencl_program opencl_codelet;
165 
166  void replaceText(std::string& text, std::string find, std::string replace);
167  void createOpenCLProgram();
168 #endif
169 
170 private:
171  std::string perfmodel_str;
172 
173  Vector<T> *input;
174  T *output;
175  Vector<T> *temp_output;
176 };
177 
178 
179 
180 
181 
206 template <typename MapFunc, typename ReduceFunc, typename T>
207 class MapReduce<MapFunc, ReduceFunc, T, skepu::BINARY> : public Task
208 {
209 
210 public:
211 
212  MapReduce(MapFunc* mapFunc, ReduceFunc* reduceFunc);
213 
214  MapReduce(MapFunc* mapFunc, ReduceFunc* reduceFunc, Vector<T> *in1, Vector<T> *in2, T *out);
215 
216  ~MapReduce();
217 
218  void *get_result()
219  {
220  if(isScalarStarPU)
221  {
222  starpu_data_unregister(scalar_result_handle); // unregister as this data can be modified
223  isScalarStarPU = false;
224  }
225 
226  return output;
227  }
228 
229 
230  void setInput1(Vector<T> *in)
231  {
232  input1 = in;
233  }
234 
235  void setInput2(Vector<T> *in)
236  {
237  input2 = in;
238  }
239 
240 
241  void setOutput(T *out)
242  {
243  if(isScalarStarPU)
244  starpu_data_acquire(scalar_result_handle, STARPU_W);
245 
246  output = out;
247 
248  if(isScalarStarPU)
249  starpu_data_release(scalar_result_handle);
250  }
251 
252  T getOutput()
253  {
254  if(isScalarStarPU)
255  {
256  T _output;
257  starpu_data_acquire(scalar_result_handle, STARPU_R); // acquire as we know that it is just a read access…
258  _output = *output;
259  starpu_data_release(scalar_result_handle); // release acquire before returning control…
260  return _output;
261  }
262  return *output;
263  }
264 
265  void run_async();
266 
267  void setConstant(T constant1) {m_mapFunc->setConstant(constant1);}
268 
269  void finishAll() {m_environment->finishAll();}
270  void setExecPlan(ExecPlan& plan) {m_execPlan = plan;}
271 
272 public:
273  Environment<int>* m_environment;
274  MapFunc* m_mapFunc;
275  ReduceFunc* m_reduceFunc;
276  ExecPlan m_execPlan;
277 
278 #ifdef USE_STARPU_HISTORY_MODEL
279  starpu_perfmodel *mapred2_perf_model;
280 #endif
281  struct starpu_codelet *codelet;
282 
283  starpu_data_handle_t scalar_result_handle;
284 
285  bool isScalarStarPU;
286 
287 public:
288  T operator()(Vector<T>& input1, Vector<T>& input2);
289 
290  T operator()(Vector<T>& input1, Vector<T>& input2, int parts);
291 
292 public:
293  static void cpu_func(void *buffers[], void *arg);
294 
295 #ifdef SKEPU_OPENMP
296 public:
297  static void omp_func(void *buffers[], void *arg);
298 #endif
299 
300 #ifdef SKEPU_CUDA
301 public:
302  static void cuda_func(void *buffers[], void *arg);
303 #endif
304 
305 #ifdef SKEPU_OPENCL
306 public:
307  static void opencl_func(void *buffers[], void *arg);
308 
309 public:
310  char kernelNameMap[1024];
311  char kernelNameRed[1024];
312 
313  struct starpu_opencl_program opencl_codelet;
314 
315  void replaceText(std::string& text, std::string find, std::string replace);
316  void createOpenCLProgram();
317 #endif
318 
319 private:
320  std::string perfmodel_str;
321 
322  Vector<T> *input1;
323  Vector<T> *input2;
324  T *output;
325  Vector<T> *temp_output;
326 };
327 
328 
329 
354 template <typename MapFunc, typename ReduceFunc, typename T>
355 class MapReduce<MapFunc, ReduceFunc, T, TERNARY>: public Task
356 {
357 
358 public:
359 
360  MapReduce(MapFunc* mapFunc, ReduceFunc* reduceFunc);
361 
362  MapReduce(MapFunc* mapFunc, ReduceFunc* reduceFunc, Vector<T> *in1, Vector<T> *in2, Vector<T> *in3, T *out);
363 
364  ~MapReduce();
365 
366  void *get_result()
367  {
368  if(isScalarStarPU)
369  {
370  starpu_data_unregister(scalar_result_handle); // unregister as this data can be modified
371  isScalarStarPU = false;
372  }
373 
374  return output;
375  }
376 
377 
378  void setInput1(Vector<T> *in)
379  {
380  input1 = in;
381  }
382 
383  void setInput2(Vector<T> *in)
384  {
385  input2 = in;
386  }
387 
388  void setInput3(Vector<T> *in)
389  {
390  input3 = in;
391  }
392 
393  void setOutput(T *out)
394  {
395  if(isScalarStarPU)
396  starpu_data_acquire(scalar_result_handle, STARPU_W);
397 
398  output = out;
399 
400  if(isScalarStarPU)
401  starpu_data_release(scalar_result_handle);
402  }
403 
404  T getOutput()
405  {
406  if(isScalarStarPU)
407  {
408  T _output;
409  starpu_data_acquire(scalar_result_handle, STARPU_R); // acquire as we know that it is just a read access…
410  _output = *output;
411  starpu_data_release(scalar_result_handle); // release acquire before returning control…
412  return _output;
413  }
414  return *output;
415  }
416 
417  void run_async();
418 
419  void setConstant(T constant1) {m_mapFunc->setConstant(constant1);}
420 
421  void finishAll() {m_environment->finishAll();}
422  void setExecPlan(ExecPlan& plan) {m_execPlan = plan;}
423 
424 public:
425  Environment<int>* m_environment;
426  MapFunc* m_mapFunc;
427  ReduceFunc* m_reduceFunc;
428  ExecPlan m_execPlan;
429 
430 #ifdef USE_STARPU_HISTORY_MODEL
431  starpu_perfmodel *mapred3_perf_model;
432 #endif
433 
434  struct starpu_codelet *codelet;
435 
436  starpu_data_handle_t scalar_result_handle;
437 
438  bool isScalarStarPU;
439 
440 public:
441  T operator()(Vector<T>& input1, Vector<T>& input2, Vector<T>& input3);
442 
443  T operator()(Vector<T>& input1, Vector<T>& input2, Vector<T>& input3, int parts);
444 
445 public:
446  static void cpu_func(void *buffers[], void *arg);
447 
448 #ifdef SKEPU_OPENMP
449 public:
450  static void omp_func(void *buffers[], void *arg);
451 #endif
452 
453 #ifdef SKEPU_CUDA
454 public:
455  static void cuda_func(void *buffers[], void *arg);
456 #endif
457 
458 #ifdef SKEPU_OPENCL
459 public:
460  static void opencl_func(void *buffers[], void *arg);
461 
462 public:
463  char kernelNameMap[1024];
464  char kernelNameRed[1024];
465 
466  struct starpu_opencl_program opencl_codelet;
467 
468  void replaceText(std::string& text, std::string find, std::string replace);
469  void createOpenCLProgram();
470 #endif
471 
472 private:
473  std::string perfmodel_str;
474 
475  Vector<T> *input1;
476  Vector<T> *input2;
477  Vector<T> *input3;
478  T *output;
479  Vector<T> *temp_output;
480 };
481 
482 }
483 
484 #include "src/mapreduce.inl"
485 
486 #include "src/mapreduce_cpu.inl"
487 
488 #ifdef SKEPU_OPENMP
489 #include "src/mapreduce_omp.inl"
490 #endif
491 
492 #ifdef SKEPU_OPENCL
493 #include "src/mapreduce_cl.inl"
494 #endif
495 
496 #ifdef SKEPU_CUDA
497 #include "src/mapreduce_cu.inl"
498 #endif
499 
500 #endif
501 
void replaceText(std::string &text, std::string find, std::string replace)
Definition: mapreduce_cl.inl:28
static void opencl_func(void *buffers[], void *arg)
Definition: mapreduce_cl.inl:130
Includes the macro files needed for the defined backends.
static void cuda_func(void *buffers[], void *arg)
Definition: mapreduce_cu.inl:29
void createOpenCLProgram()
Definition: mapreduce_cl.inl:49
void finishAll()
Definition: environment.inl:119
~MapReduce()
Definition: mapreduce.inl:189
static void cpu_func(void *buffers[], void *arg)
Definition: mapreduce_cpu.inl:21
Contains the definitions of non-backend specific functions for the MapReduce skeleton.
Contains a class declaration for the object that represents an OpenCL device.
static void omp_func(void *buffers[], void *arg)
Definition: mapreduce_omp.inl:28
A vector container class, implemented as a wrapper for std::vector. It is configured to use StarPU DS...
Definition: vector.h:40
void run_async()
Definition: mapreduce.inl:215
A class representing the MapReduce skeleton.
Definition: mapreduce.h:62
A class that describes an execution plan, not used very much in this case as decision is mostly left ...
Definition: exec_plan.h:75
Contains the definitions of OpenMP specific functions for the MapReduce skeleton. ...
Contains a class definition for Task.
A class representing a Task for the farm skeleton.
Definition: task.h:31
Contains a class declaration for the Vector container.
Contains the definitions of OpenCL specific functions for the MapReduce skeleton. ...
Contains a class declaration for Environment class.
MapReduce(MapFunc *mapFunc, ReduceFunc *reduceFunc)
Definition: mapreduce.inl:21
Contains the definitions of CUDA specific functions for the MapReduce skeleton.
Contains the definitions of CPU specific functions for the MapReduce skeleton.
T operator()(Vector< T > &input)
Definition: mapreduce.inl:237
Contains a class that stores information about which back ends to use when executing.