drake
1.0.0
|
00001 /* 00002 Copyright 2015 Nicolas Melot 00003 00004 This file is part of Drake. 00005 00006 Drake is free software: you can redistribute it and/or modify 00007 it under the terms of the GNU General Public License as published by 00008 the Free Software Foundation, either version 3 of the License, or 00009 (at your option) any later version. 00010 00011 Drake is distributed in the hope that it will be useful, 00012 but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00014 GNU General Public License for more details. 00015 00016 You should have received a copy of the GNU General Public License 00017 along with Drake. If not, see <http://www.gnu.org/licenses/>. 00018 00019 */ 00020 00021 #include <stdio.h> 00022 #include <stdarg.h> 00023 #include <unistd.h> 00024 #include <errno.h> 00025 #include <fcntl.h> 00026 #include <time.h> 00027 #include <math.h> 00028 #include <malloc.h> 00029 #include <pthread.h> 00030 #include <string.h> 00031 #include <stdlib.h> 00032 #include <time.h> 00033 #include <execinfo.h> 00034 #include <signal.h> 00035 #include <sys/resource.h> 00036 #include <sys/time.h> 00037 00038 #include <drake/schedule.h> 00039 00040 #if DEBUG 00041 /* get REG_EIP from ucontext.h */ 00042 //#define __USE_GNU 00043 #include <ucontext.h> 00044 #endif 00045 #include <execinfo.h> 00046 00047 #include <pelib/integer.h> 00048 00049 #include <drake/task.h> 00050 #include <drake/link.h> 00051 #include <drake/cross_link.h> 00052 #include <drake/processor.h> 00053 #include <drake/mapping.h> 00054 #include <drake/platform.h> 00055 #include <drake/stream.h> 00056 #include <pelib/monitor.h> 00057 00058 // Debuggin options 00059 #if 1 00060 #define debug(var) printf("[%s:%s:%d:P%zu] %s = \"%s\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL) 00061 #define debug_addr(var) printf("[%s:%s:%d:P%zu] %s = \"%p\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL) 00062 #define debug_int(var) printf("[%s:%s:%d:P%zu] %s = \"%d\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL) 00063 #define debug_uint(var) printf("[%s:%s:%d:P%zu] %s = \"%u\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL) 00064 #define debug_luint(var) printf("[%s:%s:%d:P%zu] %s = \"%lu\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL) 00065 #define debug_size_t(var) printf("[%s:%s:%d:P%zu] %s = \"%zu\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL) 00066 #else 00067 #define debug(var) 00068 #define debug_addr(var) 00069 #define debug_int(var) 00070 #define debug_size_t(var) 00071 #endif 00072 00073 #define link_name_to_int(name) strcmp((name), "left") == 0 || strcmp((name), "output") == 0 ? 0 : 1 00074 00075 static 00076 int 00077 tasks_mapped_same_cores(task_tp t1, task_tp t2) 00078 { 00079 if(t1->width != t2->width) 00080 { 00081 return 0; 00082 } 00083 else 00084 { 00085 size_t i; 00086 for(i = 0; i < t1->width; i++) 00087 { 00088 if(t1->core[i]->id != t2->core[i]->id) 00089 { 00090 return 0; 00091 } 00092 } 00093 return 1; 00094 } 00095 } 00096 00097 static 00098 void 00099 build_link(mapping_t *mapping, processor_t *proc, task_t *prod, task_t *cons, string prod_name, string cons_name) 00100 { 00101 int i, j; 00102 link_t *link = NULL; 00103 cross_link_t *cross_link; 00104 00105 // Find in target task if this link doesn't already exist 00106 map_iterator_t(string, link_tp) *kk; 00107 for(kk = pelib_map_begin(string, link_tp)(prod->succ); kk != pelib_map_end(string, link_tp)(prod->succ); kk = pelib_map_next(string, link_tp)(kk)) 00108 { 00109 link = pelib_map_read(string, link_tp)(kk).value; 00110 if(link->prod->id == prod->id && link->cons->id == cons->id) 00111 { 00112 break; 00113 } 00114 else 00115 { 00116 link = NULL; 00117 } 00118 } 00119 00120 // If a link between these two tasks does not exist 00121 if(link == NULL) 00122 { 00123 // Create and initialize a new link 00124 link = (link_t*)drake_platform_private_malloc(sizeof(link_t)); 00125 link->prod = prod; 00126 link->cons = cons; 00127 link->buffer = NULL; 00128 00129 // Add it as source and sink to both current and target tasks 00130 pair_t(string, link_tp) link_prod_pair, link_cons_pair; 00131 pelib_alloc_buffer(string)(&link_prod_pair.key, (strlen(prod_name) + 1) * sizeof(char)); 00132 pelib_alloc_buffer(string)(&link_cons_pair.key, (strlen(cons_name) + 1) * sizeof(char)); 00133 pelib_copy(string)(prod_name, &link_prod_pair.key); 00134 pelib_copy(string)(cons_name, &link_cons_pair.key); 00135 pelib_copy(link_tp)(link, &link_prod_pair.value); 00136 pelib_copy(link_tp)(link, &link_cons_pair.value); 00137 pelib_map_insert(string, link_tp)(prod->succ, link_prod_pair); 00138 pelib_map_insert(string, link_tp)(cons->pred, link_cons_pair); 00139 00140 // If tasks are mapped to different sets of cores 00141 size_t i; 00142 for(i = 0; i < (prod->width < cons->width ? prod->width : cons->width); i++) 00143 { 00144 if(prod->core[i] != cons->core[i]) 00145 { 00146 break; 00147 } 00148 } 00149 00150 // Then create a cross link between these tasks 00151 if(prod->width != cons->width || !tasks_mapped_same_cores(prod, cons)) 00152 //if(prod->width != cons->width || i < (prod->width < cons->width ? prod->width : cons->width)) 00153 { 00154 cross_link = (cross_link_t*)drake_platform_private_malloc(sizeof(cross_link_t)); 00155 cross_link->link = link; 00156 cross_link->read = NULL; 00157 cross_link->write = NULL; 00158 cross_link->prod_state = NULL; 00159 cross_link->total_read = 0; 00160 cross_link->total_written = 0; 00161 cross_link->available = 0; 00162 00163 // Add cross-core link to current task and processor 00164 size_t min = pelib_array_length(cross_link_tp)(cons->core[0]->source); 00165 size_t min_index = 0; 00166 for(i = 1; i < cons->width; i++) 00167 { 00168 if(min > pelib_array_length(cross_link_tp)(cons->core[i]->source)) 00169 { 00170 min = pelib_array_length(cross_link_tp)(cons->core[i]->source); 00171 min_index = i; 00172 } 00173 } 00174 pelib_array_append(cross_link_tp)(cons->core[min_index]->source, cross_link); 00175 pelib_array_append(cross_link_tp)(cons->source, cross_link); 00176 00177 // Add cross-core link to target task and its processor 00178 min = pelib_array_length(cross_link_tp)(prod->core[0]->source); 00179 min_index = 0; 00180 for(i = 1; i < prod->width; i++) 00181 { 00182 if(min > pelib_array_length(cross_link_tp)(prod->core[i]->source)) 00183 { 00184 min = pelib_array_length(cross_link_tp)(prod->core[i]->source); 00185 min_index = i; 00186 } 00187 } 00188 pelib_array_append(cross_link_tp)(prod->core[min_index]->sink, cross_link); 00189 pelib_array_append(cross_link_tp)(prod->sink, cross_link); 00190 00191 } 00192 else 00193 { 00194 proc->inner_links++; 00195 } 00196 } 00197 } 00198 00199 static map_t(string, task_tp)* 00200 get_task_consumers(mapping_t *mapping, task_t *task) 00201 { 00202 size_t i; 00203 map_t(string, task_tp) *consumers; 00204 consumers = pelib_alloc(map_t(string, task_tp))(); 00205 pelib_init(map_t(string, task_tp))(consumers); 00206 for(i = 0; i < mapping->schedule->consumers_in_task[task->id - 1]; i++) 00207 { 00208 pair_t(string, task_tp) pair; 00209 string name = mapping->schedule->consumers_name[task->id - 1][i]; 00210 pelib_alloc_buffer(string)(&pair.key, strlen(name) + 1); 00211 pelib_copy(string)(name, &pair.key); 00212 pair.value = drake_mapping_find_task(mapping, mapping->schedule->consumers_id[task->id - 1][i]); 00213 pelib_map_insert(string, task_tp)(consumers, pair); 00214 } 00215 00216 return consumers; 00217 } 00218 00219 static map_t(string, task_tp)* 00220 get_task_producers(mapping_t *mapping, task_t *task) 00221 { 00222 size_t i; 00223 map_t(string, task_tp) *producers; 00224 producers = pelib_alloc(map_t(string, task_tp))(); 00225 pelib_init(map_t(string, task_tp))(producers); 00226 for(i = 0; i < mapping->schedule->producers_in_task[task->id - 1]; i++) 00227 { 00228 pair_t(string, task_tp) pair; 00229 string name = mapping->schedule->producers_name[task->id - 1][i]; 00230 pelib_alloc_buffer(string)(&pair.key, strlen(name) + 1); 00231 pelib_copy(string)(name, &pair.key); 00232 pair.value = drake_mapping_find_task(mapping, mapping->schedule->producers_id[task->id - 1][i]); 00233 pelib_map_insert(string, task_tp)(producers, pair); 00234 } 00235 00236 return producers; 00237 } 00238 00239 static 00240 void 00241 build_tree_network(mapping_t* mapping) 00242 { 00243 int i, j; 00244 task_t *target_task, *current_task; 00245 link_t *link; 00246 processor_t *proc; 00247 cross_link_t *cross_link; 00248 00249 for(i = 0; i < mapping->processor_count; i++) 00250 { 00251 proc = mapping->proc[i]; 00252 proc->inner_links = 0; 00253 00254 for(j = 0; j < mapping->proc[i]->handled_nodes; j++) 00255 { 00256 current_task = mapping->proc[i]->task[j]; 00257 00258 map_t(string, task_tp) *producers = get_task_producers(mapping, current_task); 00259 map_iterator_t(string, task_tp)* kk; 00260 for(kk = pelib_map_begin(string, task_tp)(producers); kk != pelib_map_end(string, task_tp)(producers); kk = pelib_map_next(string, task_tp)(kk)) 00261 { 00262 target_task = pelib_map_read(string, task_tp)(kk).value; 00263 string prod_name = pelib_map_read(string, task_tp)(kk).key; 00264 // Get link name from the producer's point of view 00265 string cons_name; 00266 size_t l; 00267 for(l = 0; l < mapping->schedule->consumers_in_task[target_task->id - 1]; l++) 00268 { 00269 if(mapping->schedule->consumers_id[target_task->id - 1][l] == current_task->id) 00270 { 00271 cons_name = mapping->schedule->consumers_name[target_task->id - 1][l]; 00272 break; 00273 } 00274 } 00275 00276 if(target_task != NULL) 00277 { 00278 build_link(mapping, proc, target_task, current_task, prod_name, cons_name); 00279 00280 size_t l; 00281 for(l = 0; l < current_task->width; l++) 00282 { 00283 int index = drake_mapping_find_processor_index(mapping, current_task->core[l]->id); 00284 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], current_task->id)]; 00285 *t = *current_task; 00286 } 00287 00288 for(l = 0; l < target_task->width; l++) 00289 { 00290 int index = drake_mapping_find_processor_index(mapping, target_task->core[l]->id); 00291 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], target_task->id)]; 00292 *t = *target_task; 00293 } 00294 } 00295 } 00296 //pelib_destroy(map_t(string, task_tp))(*producers); 00297 pelib_free(map_t(string, task_tp))(producers); 00298 00299 map_t(string, task_tp) *consumers = get_task_consumers(mapping, current_task); 00300 for(kk = pelib_map_begin(string, task_tp)(consumers); kk != pelib_map_end(string, task_tp)(consumers); kk = pelib_map_next(string, task_tp)(kk)) 00301 { 00302 target_task = pelib_map_read(string, task_tp)(kk).value; 00303 00304 string cons_name = pelib_map_read(string, task_tp)(kk).key; 00305 // Get link name from the producer's point of view 00306 string prod_name; 00307 size_t l; 00308 for(l = 0; l < mapping->schedule->producers_in_task[target_task->id - 1]; l++) 00309 { 00310 if(mapping->schedule->producers_id[target_task->id - 1][l] == current_task->id) 00311 { 00312 prod_name = mapping->schedule->producers_name[target_task->id - 1][l]; 00313 break; 00314 } 00315 } 00316 00317 if(target_task != NULL) 00318 { 00319 build_link(mapping, proc, current_task, target_task, prod_name, cons_name); 00320 00321 size_t l; 00322 for(l = 0; l < current_task->width; l++) 00323 { 00324 int index = drake_mapping_find_processor_index(mapping, current_task->core[l]->id); 00325 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], current_task->id)]; 00326 *t = *current_task; 00327 } 00328 for(l = 0; l < target_task->width; l++) 00329 { 00330 int index = drake_mapping_find_processor_index(mapping, target_task->core[l]->id); 00331 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], target_task->id)]; 00332 *t = *target_task; 00333 } 00334 } 00335 } 00336 //pelib_destroy(map_t(string, task_tp))(*consumers); 00337 pelib_free(map_t(string, task_tp))(consumers); 00338 } 00339 } 00340 } 00341 00342 #if PRINTF_FEEDBACK || PRINTF_PUSH || PRINTF_CHECK_IN || PRINTF_CHECK_OUT 00343 static int 00344 monitor(task_t *task, cross_link_t *link) 00345 { 00346 return 0; 00347 } 00348 int printf_enabled = -1; 00349 #endif 00350 00351 static 00352 void 00353 feedback_link(task_t *task, cross_link_t *link) 00354 { 00355 enum task_status new_state; 00356 size_t size; 00357 00358 size = link->available - pelib_cfifo_length(int)(link->link->buffer); 00359 00360 if(size > 0) 00361 { 00362 #if PRINTF_FEEDBACK 00363 if((printf_enabled & 1) && monitor(task, link)) { 00364 } 00365 #endif 00366 link->total_read += size; 00367 *link->read = link->total_read; 00368 drake_platform_commit(link->read); 00369 link->available = pelib_cfifo_length(int)(link->link->buffer); 00370 #if PRINTF_FEEDBACK 00371 if((printf_enabled & 1) && monitor(task, link)) { 00372 } 00373 #endif 00374 } 00375 } 00376 00377 static 00378 void 00379 push_link(task_t *task, cross_link_t* link) 00380 { 00381 size_t length = pelib_cfifo_length(int)(link->link->buffer); 00382 size_t size = length - link->available; 00383 /* 00384 printf("[%s:%s:%d][Task %d %s] Number of elements ready to push: %zu\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, length); 00385 printf("[%s:%s:%d][Task %d %s] Number of elements to be pushed: %zu\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, size); 00386 printf("[%s:%s:%d][Task %d %s] Available elements in memory: %zu\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->available); 00387 printf("[%s:%s:%d][Task %d %s] Will push %zu elements\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, size); 00388 printf("[%s:%s:%d][Task %d %s] Writing write counter at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->write); 00389 printf("[%s:%s:%d][Task %d %s] Writing elements at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->link->buffer->buffer); 00390 */ 00391 00392 if(size > 0) 00393 { 00394 #if PRINTF_PUSH 00395 if((printf_enabled & 1) && monitor(task, link)) { 00396 } 00397 #endif 00398 link->total_written += size; 00399 *link->write = link->total_written; 00400 drake_platform_commit(link->write); 00401 link->available = pelib_cfifo_length(int)(link->link->buffer); 00402 #if PRINTF_PUSH 00403 if((printf_enabled & 2) && monitor(task, link)) { 00404 } 00405 #endif 00406 } 00407 00408 // Also send the latest task status 00409 *link->prod_state = task->status; 00410 } 00411 00412 static 00413 void 00414 task_commit(task_t* task) 00415 { 00416 int i; 00417 00418 for(i = 0; i < pelib_array_length(cross_link_tp)(task->source); i++) 00419 { 00420 feedback_link(task, pelib_array_read(cross_link_tp)(task->source, i)); 00421 } 00422 for(i = 0; i < pelib_array_length(cross_link_tp)(task->sink); i++) 00423 { 00424 push_link(task, pelib_array_read(cross_link_tp)(task->sink, i)); 00425 } 00426 } 00427 00428 static 00429 void 00430 check_input_link(task_t *task, cross_link_t *link) 00431 { 00432 enum task_status new_state; 00433 drake_platform_pull(link->write); 00434 /* 00435 printf("[%s:%s:%d][Task %d %s] Checking write counter at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->write); 00436 printf("[%s:%s:%d][Task %d %s] %zu elements written\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, *link->write); 00437 printf("[%s:%s:%d][Task %d %s] Reading elements at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->link->buffer->buffer); 00438 */ 00439 size_t write = *link->write - link->total_written; 00440 size_t actual_write = 0; 00441 00442 actual_write = pelib_cfifo_fill(int)(link->link->buffer, write); 00443 link->actual_written = actual_write; 00444 00445 if(actual_write > 0) 00446 { 00447 #if PRINTF_CHECK_IN 00448 if((printf_enabled & 4) && monitor(task, link)) { 00449 } 00450 #endif 00451 link->available = pelib_cfifo_length(int)(link->link->buffer); 00452 link->total_written += write; 00453 #if PRINTF_CHECK_IN 00454 if((printf_enabled & 4) && monitor(task, link)) { 00455 } 00456 #endif 00457 } 00458 00459 new_state = *link->prod_state; 00460 if(new_state != link->link->prod->status) 00461 { 00462 link->link->prod->status = new_state; 00463 #if PRINTF_CHECK_IN 00464 if((printf_enabled & 1) && monitor(task, link)) { 00465 } 00466 #endif 00467 } 00468 } 00469 00470 static 00471 void 00472 check_output_link(task_t *task, cross_link_t *link) 00473 { 00474 enum task_status new_state; 00475 drake_platform_pull(link->read); 00476 size_t read = *link->read - link->total_read; 00477 size_t actual_read = pelib_cfifo_discard(int)(link->link->buffer, read); 00478 link->actual_read = actual_read; 00479 00480 if(actual_read > 0) 00481 { 00482 #if PRINTF_CHECK_OUT 00483 if((printf_enabled & 8) && monitor(task, link)) { 00484 } 00485 #endif 00486 // Keep track of how much was written before work 00487 link->available = pelib_cfifo_length(int)(link->link->buffer); 00488 link->total_read += read; 00489 #if PRINTF_CHECK_OUT 00490 if((printf_enabled & 8) && monitor(task, link)) { 00491 } 00492 #endif 00493 } 00494 else 00495 { 00496 /* 00497 if(read > 0) 00498 { 00499 } 00500 */ 00501 } 00502 } 00503 00504 static 00505 void 00506 task_check(task_t *task) 00507 { 00508 int i; 00509 00510 for(i = 0; i < pelib_array_length(cross_link_tp)(task->source); i++) 00511 { 00512 check_input_link(task, pelib_array_read(cross_link_tp)(task->source, i)); 00513 } 00514 00515 for(i = 0; i < pelib_array_length(cross_link_tp)(task->sink); i++) 00516 { 00517 check_output_link(task, pelib_array_read(cross_link_tp)(task->sink, i)); 00518 } 00519 } 00520 00521 static 00522 size_t 00523 buffer_size(size_t mpb_size, size_t nb_in, size_t nb_out) 00524 { 00525 #if 0 00526 size_t ret = (nb_in > 0) ? (mpb_size // total allocable MPB size 00527 - nb_in // As many input buffer as 00528 * (sizeof(size_t) // Size of a write 00529 + sizeof(enum task_status) // State of a task 00530 ) 00531 - nb_out // As many output buffers as 00532 * sizeof(size_t) // Size of a read 00533 ) 00534 / nb_in : mpb_size; // Remaining space to be divided by number of input links 00535 #endif 00536 00537 size_t ret = (mpb_size // total allocable MPB size 00538 - nb_in // As many input buffer as 00539 * (sizeof(size_t) // Size of a write 00540 + sizeof(enum task_status) // State of a task 00541 ) 00542 - nb_out // As many output buffers as 00543 * sizeof(size_t) // Size of a read 00544 ) 00545 / nb_in; // Remaining space to be divided by number of input links 00546 00547 return ret; 00548 } 00549 00550 static 00551 void 00552 printf_mpb_allocation(size_t mpb_size, size_t nb_in, size_t nb_out) 00553 { 00554 fprintf(stderr, "MPB size: %zu\n", mpb_size); 00555 fprintf(stderr, "Input links: %zu\n", nb_in); 00556 fprintf(stderr, "Output links: %zu\n", nb_out); 00557 fprintf(stderr, "Total allocable memory per input link: %zu\n", buffer_size(mpb_size, nb_in, nb_out < 1)); 00558 } 00559 00560 static 00561 void 00562 error_small_mpb(size_t mpb_size, size_t nb_in, size_t nb_out, processor_t *proc) 00563 { 00564 fprintf(stderr, "Too much cross core links at processor %u\n", proc->id); 00565 printf_mpb_allocation(mpb_size, nb_in, nb_out); 00566 abort(); 00567 } 00568 00569 static 00570 int 00571 check_mpb_size(size_t mpb_size, size_t nb_in, size_t nb_out, processor_t *proc) 00572 { 00573 if(buffer_size(mpb_size, nb_in, nb_out) < 1) 00574 { 00575 error_small_mpb(mpb_size, nb_in, nb_out, proc); 00576 return 0; 00577 } 00578 else 00579 { 00580 return 1; 00581 } 00582 } 00583 00584 static 00585 void 00586 allocate_buffers(drake_stream_t* stream) 00587 { 00588 int i, j, k, nb, nb_in, nb_out; 00589 task_t* task; 00590 link_t *link; 00591 cross_link_t *cross_link; 00592 processor_t *proc; 00593 mapping_t *mapping = stream->mapping; 00594 /* 00595 fprintf(stderr, "[%s:%s:%d] %zX\n", __FILE__, __FUNCTION__, __LINE__, stream->stack); 00596 fprintf(stderr, "[%s:%s:%d] %zX\n", __FILE__, __FUNCTION__, __LINE__, stream->stack->base_ptr); 00597 fprintf(stderr, "[%s:%s:%d] %zX\n", __FILE__, __FUNCTION__, __LINE__, stream->stack->stack_ptr); 00598 */ 00599 00600 for(i = 0; i < mapping->processor_count; i++) 00601 { 00602 proc = mapping->proc[i]; 00603 00604 for(j = 0; j < proc->handled_nodes; j++) 00605 { 00606 task = proc->task[j]; 00607 00608 // Take care of all successor links 00609 map_iterator_t(string, link_tp)* kk; 00610 for(kk = pelib_map_begin(string, link_tp)(task->succ); kk != pelib_map_end(string, link_tp)(task->succ); kk = pelib_map_next(string, link_tp)(kk)) 00611 { 00612 link = pelib_map_read(string, link_tp)(kk).value; 00613 // If this is not the root task 00614 if(link->cons != NULL) 00615 { 00616 // If the task is mapped to the same core: inner link 00617 if(tasks_mapped_same_cores(task, link->cons)) 00618 { 00619 if(link->buffer == NULL) 00620 { 00621 size_t capacity = drake_platform_shared_size() / proc->inner_links / sizeof(int); 00622 link->buffer = pelib_alloc_collection(cfifo_t(int))(capacity); 00623 pelib_init(cfifo_t(int))(link->buffer); 00624 } 00625 } 00626 else 00627 { 00628 size_t l; 00629 for(l = 0; l < link->cons->width; l++) 00630 { 00631 size_t m; 00632 for(m = 0; m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source); m++) 00633 { 00634 if(pelib_array_read(cross_link_tp)(link->cons->core[l]->source, m)->link->prod->id == task->id) 00635 { 00636 break; 00637 } 00638 } 00639 00640 if(m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source)) 00641 { 00642 break; 00643 } 00644 } 00645 if(l == link->cons->width) 00646 { 00647 fprintf(stderr, "[%s:%d:P%zu] Something has gone terribly wrong here\n", __FILE__, __LINE__, drake_platform_core_id()); 00648 abort(); 00649 } 00650 size_t nb_in_succ = pelib_array_length(cross_link_tp)(link->cons->core[l]->source); 00651 size_t nb_out_succ = pelib_array_length(cross_link_tp)(link->cons->core[l]->sink); 00652 check_mpb_size(stream->local_memory_size, nb_in_succ, nb_out_succ, proc); 00653 00654 // If the task is mapped to another core: output link 00655 if(link->buffer == NULL) 00656 { 00657 // Perform this allocation manually 00658 link->buffer = pelib_alloc_struct(cfifo_t(int))(); 00659 int core = link->cons->core[l]->id; 00660 size_t capacity = buffer_size(stream->local_memory_size, nb_in_succ, nb_out_succ) / sizeof(int); 00661 capacity = capacity - (capacity % drake_platform_shared_align()); 00662 link->buffer->buffer = (int*)drake_platform_shared_malloc(sizeof(int) * capacity, core); 00663 link->buffer->capacity = capacity; 00664 pelib_init(cfifo_t(int))(link->buffer); 00665 } 00666 } 00667 } 00668 } 00669 00670 // Take care of all predecessor links 00671 for(kk = pelib_map_begin(string, link_tp)(task->pred); kk != pelib_map_end(string, link_tp)(task->pred); kk = pelib_map_next(string, link_tp)(kk)) 00672 { 00673 link = pelib_map_read(string, link_tp)(kk).value; 00674 00675 // If this is not the root task 00676 if(link->prod != NULL) 00677 { 00678 // If the task is mapped to the same core: inner link 00679 if(tasks_mapped_same_cores(task, link->prod)) 00680 { 00681 if(link->buffer == NULL) 00682 { 00683 size_t capacity = drake_platform_shared_size() / proc->inner_links / sizeof(int); 00684 link->buffer = pelib_alloc_collection(cfifo_t(int))(capacity); 00685 pelib_init(cfifo_t(int))(link->buffer); 00686 } 00687 } 00688 else 00689 { 00690 size_t l; 00691 for(l = 0; l < link->cons->width; l++) 00692 { 00693 size_t m; 00694 for(m = 0; m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source); m++) 00695 { 00696 if(pelib_array_read(cross_link_tp)(link->cons->core[l]->source, m)->link->prod->id == link->prod->id) 00697 { 00698 break; 00699 } 00700 } 00701 00702 if(m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source)) 00703 { 00704 break; 00705 } 00706 } 00707 00708 nb_in = pelib_array_length(cross_link_tp)(proc->source); 00709 nb_out = pelib_array_length(cross_link_tp)(proc->sink); 00710 check_mpb_size(stream->local_memory_size, nb_in, nb_out, proc); 00711 // If the task is mapped to another core: cross link 00712 if(link->buffer == NULL) 00713 { 00714 // Perform this allocation manually 00715 link->buffer = pelib_alloc_struct(cfifo_t(int))(); 00716 00717 //int core = task->core[l]->id; 00718 int core = link->cons->core[l]->id; 00719 size_t capacity = buffer_size(stream->local_memory_size, nb_in, nb_out) / sizeof(int); 00720 capacity = capacity - (capacity % drake_platform_shared_align()); 00721 //link->buffer->buffer = (int*)drake_remote_addr(stack_grow(stack, sizeof(int) * capacity, core), core); 00722 link->buffer->buffer = (int*)drake_platform_shared_malloc(sizeof(int) * capacity, core); 00723 link->buffer->capacity = capacity; 00724 pelib_init(cfifo_t(int))(link->buffer); 00725 } 00726 } 00727 } 00728 } 00729 00730 // Take care of all output links 00731 for(k = 0; k < pelib_array_length(cross_link_tp)(task->sink); k++) 00732 { 00733 cross_link = pelib_array_read(cross_link_tp)(task->sink, k); 00734 00735 if(cross_link->read == NULL) 00736 { 00737 size_t l; 00738 for(l = 0; l < cross_link->link->cons->width; l++) 00739 { 00740 size_t m; 00741 for(m = 0; m < pelib_array_length(cross_link_tp)(cross_link->link->cons->core[l]->source); m++) 00742 { 00743 if(pelib_array_read(cross_link_tp)(cross_link->link->cons->core[l]->source, m)->link->prod->id == cross_link->link->prod->id) 00744 { 00745 break; 00746 } 00747 } 00748 00749 if(m < pelib_array_length(cross_link_tp)(cross_link->link->cons->core[l]->source)) 00750 { 00751 break; 00752 } 00753 } 00754 if(l == cross_link->link->cons->width) 00755 { 00756 fprintf(stderr, "[%s:%d:P%zu] Something has gone terribly wrong here\n", __FILE__, __LINE__, drake_platform_core_id()); 00757 abort(); 00758 } 00759 cross_link->read = (volatile size_t*)drake_platform_shared_malloc_mailbox(sizeof(size_t), cross_link->link->cons->core[l]->id); 00760 00761 *cross_link->read = 0; 00762 } 00763 } 00764 00765 // Take care of all input links 00766 for(k = 0; k < pelib_array_length(cross_link_tp)(task->source); k++) 00767 { 00768 cross_link = pelib_array_read(cross_link_tp)(task->source, k); 00769 size_t l; 00770 for(l = 0; l < cross_link->link->prod->width; l++) 00771 { 00772 size_t m; 00773 for(m = 0; m < pelib_array_length(cross_link_tp)(link->prod->core[l]->sink); m++) 00774 { 00775 if(pelib_array_read(cross_link_tp)(cross_link->link->prod->core[l]->sink, m)->link->prod->id == cross_link->link->prod->id) 00776 { 00777 break; 00778 } 00779 } 00780 00781 if(m < pelib_array_length(cross_link_tp)(cross_link->link->prod->core[l]->sink)) 00782 { 00783 break; 00784 } 00785 } 00786 if(l == cross_link->link->cons->width) 00787 { 00788 fprintf(stderr, "[%s:%d:P%zu] Something has gone terribly wrong here\n", __FILE__, __LINE__, drake_platform_core_id()); 00789 abort(); 00790 } 00791 00792 00793 if(cross_link->prod_state == NULL) 00794 { 00795 cross_link->prod_state = (task_status_t*)drake_platform_shared_malloc_mailbox(sizeof(enum task_status), cross_link->link->cons->core[l]->id); 00796 *cross_link->prod_state = TASK_START; 00797 } 00798 00799 if(cross_link->write == NULL) 00800 { 00801 cross_link->write = (volatile size_t*)drake_platform_shared_malloc_mailbox(sizeof(size_t), cross_link->link->cons->core[l]->id); 00802 *cross_link->write = 0; 00803 } 00804 } 00805 } 00806 } 00807 } 00808 00809 #if MONITOR_EXCEPTIONS 00810 00811 task_t *current = NULL; 00812 struct sigaction oldact[5]; 00813 int signal_counter = 0; 00814 void 00815 bt_sighandler(int sig, siginfo_t *info, void *secret) 00816 { 00817 // Restores normal signal catching 00818 sigaction(SIGSEGV, &oldact[0], NULL); 00819 sigaction(SIGUSR1, &oldact[1], NULL); 00820 sigaction(SIGINT, &oldact[2], NULL); 00821 sigaction(SIGFPE, &oldact[3], NULL); 00822 sigaction(SIGTERM, &oldact[4], NULL); 00823 00824 void *array[10]; 00825 size_t size; 00826 char **strings; 00827 size_t i; 00828 00829 size = backtrace (array, 10); 00830 strings = backtrace_symbols (array, size); 00831 00832 if(drake_platform_core_id() == 0) 00833 { 00834 printf("[%s:%s:%d:P%zu] Caught signal %d\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), sig); 00835 switch(sig) 00836 { 00837 case SIGFPE: 00838 { 00839 printf("[%s:%s:%d:P%zu] Caught floating-point exception (SIGFPE)\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id()); 00840 } 00841 break; 00842 case SIGSEGV: 00843 { 00844 printf("[%s:%s:%d:P%zu] Caught segmentation fault exception (SIGSEGV)\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id()); 00845 } 00846 break; 00847 default: 00848 { 00849 printf("[%s:%s:%d:P%zu] Caught unknown signal: %d\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), sig); 00850 } 00851 break; 00852 } 00853 00854 printf("[%s:%s:%d:P%zu] Obtained %zd stack frames.\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), size); 00855 printf("[%s:%s:%d:P%zu] ", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id()); 00856 00857 for (i = 0; i < size; i++) 00858 { 00859 printf("%s ", strings[i]); 00860 } 00861 printf("\n"); 00862 } 00863 00864 abort(); 00865 } 00866 #endif 00867 00868 static mapping_t* 00869 prepare_mapping(drake_schedule_t *schedule) 00870 { 00871 size_t i, j; 00872 mapping_t *mapping; 00873 processor_t *processor = NULL; 00874 00875 size_t num_cores = schedule->core_number; 00876 mapping = pelib_alloc_collection(mapping_t)(num_cores); 00877 mapping->schedule = schedule; 00878 00879 for(j = 1; j <= schedule->core_number; j++) 00880 { 00881 size_t tasks_in_core = schedule->tasks_in_core[j - 1]; 00882 processor = pelib_alloc_collection(processor_t)(tasks_in_core); 00883 processor->id = j - 1; 00884 size_t producers_in_core = schedule->producers_in_core[j - 1]; 00885 size_t consumers_in_core = schedule->consumers_in_core[j - 1]; 00886 processor->source = pelib_alloc_collection(array_t(cross_link_tp))(producers_in_core); 00887 processor->sink = pelib_alloc_collection(array_t(cross_link_tp))(consumers_in_core); 00888 drake_mapping_insert_processor(mapping, processor); 00889 } 00890 00891 size_t task_counter = 0; 00892 task_t *tasks = malloc(sizeof(task_t) * schedule->task_number); 00893 for(j = 1; j <= schedule->core_number; j++) 00894 { 00895 for(i = 1; i <= schedule->tasks_in_core[j - 1]; i++) 00896 { 00897 // Check if this that has been already mapped or not 00898 size_t k; 00899 for(k = 0; k < task_counter; k++) 00900 { 00901 if(tasks[k].id == schedule->schedule[j - 1][i - 1].id) 00902 { 00903 break; 00904 } 00905 } 00906 if(k == task_counter) 00907 { 00908 // This is a new task 00909 task_t task; 00910 task.id = schedule->schedule[j - 1][i - 1].id; 00911 task.name = schedule->task_name[task.id - 1]; 00912 task.core = malloc(sizeof(processor_t*)); 00913 task.core[0] = mapping->proc[j - 1]; 00914 task.width = 1; 00915 tasks[task_counter] = task; 00916 task_counter++; 00917 00918 } 00919 else 00920 { 00921 // We update an existing task 00922 processor_t **list = tasks[k].core; 00923 tasks[k].core = malloc(sizeof(processor_t*) * (tasks[k].width + 1)); 00924 memcpy(tasks[k].core, list, sizeof(processor_t*) * tasks[k].width); 00925 tasks[k].core[tasks[k].width] = mapping->proc[j - 1]; 00926 tasks[k].width++; 00927 free(list); 00928 } 00929 } 00930 } 00931 for(j = 1; j <= schedule->core_number; j++) 00932 { 00933 for(i = 1; i <= schedule->tasks_in_core[j - 1]; i++) 00934 { 00935 size_t k; 00936 task_t task; 00937 for(k = 0; k < task_counter; k++) 00938 { 00939 if(tasks[k].id == schedule->schedule[j - 1][i - 1].id) 00940 { 00941 task = tasks[k]; 00942 break; 00943 } 00944 } 00945 if(k == task_counter) 00946 { 00947 fprintf(stderr, "Catastrophic error at %s:%d: trying to create a task that is not in schedule. This is non-sense, aborting.\n", __FILE__, __LINE__); 00948 abort(); 00949 } 00950 task.name = schedule->task_name[task.id - 1]; 00951 task.workload = schedule->task_workload[task.id - 1]; 00952 task.frequency = schedule->schedule[j - 1][i - 1].frequency; 00953 size_t producers_in_task = schedule->producers_in_task[task.id - 1]; 00954 size_t consumers_in_task = schedule->consumers_in_task[task.id - 1]; 00955 size_t remote_producers_in_task = schedule->remote_producers_in_task[task.id - 1]; 00956 size_t remote_consumers_in_task = schedule->remote_consumers_in_task[task.id - 1]; 00957 task.pred = pelib_alloc(map_t(string, link_tp))(); 00958 task.succ = pelib_alloc(map_t(string, link_tp))(); 00959 pelib_init(map_t(string, link_tp))(task.pred); 00960 pelib_init(map_t(string, link_tp))(task.succ); 00961 task.source = pelib_alloc_collection(array_t(cross_link_tp))(remote_producers_in_task); 00962 task.sink = pelib_alloc_collection(array_t(cross_link_tp))(remote_consumers_in_task); 00963 00964 task.status = TASK_START; 00965 drake_mapping_insert_task(mapping, j - 1, &task); 00966 } 00967 } 00968 free(tasks); 00969 00970 return mapping; 00971 } 00972 00973 drake_stream_t 00974 drake_stream_create_explicit(void (*schedule_init)(drake_schedule_t*), void (*schedule_destroy)(drake_schedule_t*), void* (*task_function)(size_t id, task_status_t status), drake_platform_t pt) 00975 { 00976 drake_stream_t stream; 00977 int k; 00978 char* outputfile; 00979 array_t(int) *array = NULL; 00980 00981 // Initialize functions 00982 stream.schedule_destroy = schedule_destroy; 00983 mapping_t *mapping; 00984 int i, j, max_nodes; 00985 int rcce_id; 00986 00987 processor_t *proc; 00988 task_t *task; 00989 link_t* link; 00990 00991 unsigned long long int start, stop, begin, end; 00992 schedule_init(&stream.schedule); 00993 if(stream.schedule.core_number != drake_platform_core_size()) 00994 { 00995 fprintf(stderr, "Application compiled for different number of cores (%zu) than available on this platform (%zu). Please recompile drake application with a correct platform description\n", stream.schedule.core_number, drake_platform_core_size()); 00996 abort(); 00997 } 00998 drake_schedule_t *schedule = &stream.schedule; 00999 drake_platform_barrier(NULL); 01000 mapping = prepare_mapping(schedule); 01001 01002 // Build task network based on tasks' id 01003 build_tree_network(mapping); 01004 size_t stuff1 = drake_platform_core_id(); 01005 int stuff2 = drake_mapping_find_processor_index(mapping, stuff1); 01006 proc = mapping->proc[stuff2]; 01007 01008 // Initialize task's pointer 01009 max_nodes = (proc == NULL ? 0 : proc->handled_nodes); 01010 for(i = 0; i < max_nodes; i++) 01011 { 01012 task_t *task = proc->task[i]; 01013 task->status = TASK_START; 01014 task->init = (int (*)(task_t*, void*))task_function(task->id, TASK_INIT); 01015 task->start = (int (*)(task_t*))task_function(task->id, TASK_START); 01016 task->run = (int (*)(task_t*))task_function(task->id, TASK_RUN); 01017 task->destroy = (int (*)(task_t*))task_function(task->id, TASK_DESTROY); 01018 task->kill = (int (*)(task_t*))task_function(task->id, TASK_KILLED); 01019 } 01020 01021 stream.mapping = mapping; 01022 stream.proc = proc; 01023 stream.local_memory_size = drake_platform_shared_size(); 01024 stream.stage_start_time = drake_platform_time_alloc(); 01025 stream.stage_stop_time = drake_platform_time_alloc(); 01026 stream.stage_sleep_time = drake_platform_time_alloc(); 01027 stream.stage_time = drake_platform_time_alloc(); 01028 drake_platform_time_init(stream.stage_time, schedule->stage_time); 01029 stream.zero = drake_platform_time_alloc(); 01030 drake_platform_time_init(stream.zero, 0); 01031 01032 stream.platform = pt; 01033 01034 return stream; 01035 } 01036 01037 int 01038 drake_stream_init(drake_stream_t *stream, void *aux) 01039 { 01040 int success = 1; 01041 size_t i; 01042 01043 if(stream->proc != NULL && stream->proc->handled_nodes > 0) 01044 { 01045 size_t max_nodes = stream->proc->handled_nodes; 01046 allocate_buffers(stream); 01047 for(i = 0; i < max_nodes; i++) 01048 { 01049 task_t *task = stream->proc->task[i]; 01050 int run = task->init(task, aux); 01051 //int run = 1; 01052 success = success && run; 01053 } 01054 drake_platform_barrier(NULL); 01055 } 01056 else 01057 { 01058 // Deactivate core is no task is to be run 01059 drake_platform_core_disable(stream->platform, drake_platform_core_id()); 01060 drake_platform_barrier(NULL); 01061 } 01062 01063 return success; 01064 } 01065 01066 int 01067 drake_stream_destroy(drake_stream_t* stream) 01068 { 01069 // Run the destroy method for each task 01070 task_t *task; 01071 int i; 01072 int success; 01073 mapping_t *mapping = stream->mapping; 01074 processor_t *proc = stream->proc; 01075 for(i = 0; proc != NULL && i < proc->handled_nodes; i++) 01076 { 01077 task = proc->task[i]; 01078 task->status = TASK_DESTROY; 01079 // /!\ Do not invert the operand of the boolean expression below 01080 // Or Drake will not run a destroy method of a task if a previous 01081 // task did not return 1 when destroyed. 01082 success = task->destroy(task) && success; 01083 } 01084 01085 // Free the stream data structures 01086 stream->schedule_destroy(&stream->schedule); 01087 free(stream->stage_start_time); 01088 free(stream->stage_stop_time); 01089 free(stream->stage_sleep_time); 01090 free(stream->stage_time); 01091 free(stream->zero); 01092 01093 // TODO: deallocate buffers, if core had any task to run 01094 01095 return success; 01096 } 01097 01098 int 01099 drake_stream_run(drake_stream_t* stream) 01100 { 01101 unsigned long long int start, stop; 01102 size_t i, j; 01103 task_t *task; 01104 mapping_t *mapping = stream->mapping; 01105 processor_t *proc = stream->proc; 01106 01107 if(proc == NULL) 01108 { 01109 return 0; 01110 } 01111 01112 int active_tasks = proc->handled_nodes; 01113 unsigned long long int begin, end, obegin, oend; 01114 01115 // Make sure everyone starts at the same time 01116 time_t timeref = time(NULL); 01117 int done = 0; 01118 01119 #if MONITOR_EXCEPTIONS 01120 /* Install our signal handler */ 01121 struct sigaction sa; 01122 01123 sa.sa_sigaction = bt_sighandler; 01124 sigemptyset (&sa.sa_mask); 01125 sa.sa_flags = SA_RESTART | SA_SIGINFO; 01126 01127 sigaction(SIGSEGV, &sa, &oldact[0]); 01128 sigaction(SIGUSR1, &sa, &oldact[1]); 01129 sigaction(SIGINT, &sa, &oldact[2]); 01130 sigaction(SIGFPE, &sa, &oldact[3]); 01131 sigaction(SIGTERM, &sa, &oldact[4]); 01132 #endif 01133 01134 // Set frequency of first task 01135 int freq; 01136 if(proc->handled_nodes > 0) 01137 { 01138 freq = proc->task[0]->frequency; 01139 drake_platform_set_voltage_frequency(stream->platform, freq); 01140 } 01141 01142 /* Phase 1, real */ 01143 while(active_tasks > 0) 01144 { 01145 // Capture the stage starting time 01146 drake_platform_time_get(stream->stage_start_time); 01147 for(i = 0; i < proc->handled_nodes; i++) 01148 { 01149 task = proc->task[i]; 01150 if(task->status < TASK_KILLED) 01151 { 01152 // Switch frequency 01153 if(freq != task->frequency) 01154 { 01155 freq = task->frequency; 01156 drake_platform_set_voltage_frequency(stream->platform, freq); 01157 } 01158 } 01159 01160 switch(task->status) 01161 { 01162 // Checks input and proceeds to start when first input come 01163 case TASK_START: 01164 done = task->start(task); 01165 task_check(task); 01166 01167 if(task->status == TASK_START && done) 01168 { 01169 task->status = TASK_RUN; 01170 } 01171 01172 // Commit 01173 task_commit(task); 01174 break; 01175 01176 case TASK_RUN: 01177 // Check 01178 task_check(task); 01179 01180 // Work 01181 done = task->run(task); 01182 01183 // Commit 01184 task_commit(task); 01185 01186 if(task->status == TASK_RUN && done) 01187 { 01188 01189 task->status = TASK_KILLED; 01190 } 01191 01192 // Decrement active tasks 01193 case TASK_KILLED: 01194 if(task->status == TASK_KILLED) 01195 { 01196 task->kill(task); 01197 task->status = TASK_ZOMBIE; 01198 active_tasks--; 01199 01200 // Commit 01201 task_commit(task); 01202 } 01203 break; 01204 01205 // Loop until all other tasks are also done 01206 case TASK_ZOMBIE: 01207 if(task->status == TASK_ZOMBIE) 01208 { 01209 task->status = TASK_ZOMBIE; 01210 } 01211 break; 01212 01213 // Should not happen. If it does happen, let the scheduler decide what to do 01214 case TASK_INVALID: 01215 task->status = TASK_ZOMBIE; 01216 break; 01217 01218 default: 01219 task->status = TASK_INVALID; 01220 break; 01221 } 01222 } 01223 01224 // Pause until the stage time elapses, if any 01225 if(drake_platform_time_greater(stream->stage_time, stream->zero)) 01226 { 01227 drake_platform_time_get(stream->stage_stop_time); 01228 drake_platform_time_subtract(stream->stage_sleep_time, stream->stage_stop_time, stream->stage_start_time); 01229 if(!drake_platform_time_greater(stream->stage_sleep_time, stream->stage_time)) 01230 { 01231 drake_platform_time_subtract(stream->stage_sleep_time, stream->stage_time, stream->stage_sleep_time); 01232 drake_platform_sleep(stream->stage_sleep_time); 01233 } 01234 } 01235 } 01236 01237 // Get core to sleep 01238 drake_platform_sleep_enable(stream->platform, drake_platform_core_id()); 01239 //drake_platform_core_disable(stream->platform, drake_platform_core_id()); 01240 01241 return 0; 01242 } 01243