drake  1.0.0
src/stream.c
Go to the documentation of this file.
00001 /*
00002  Copyright 2015 Nicolas Melot
00003 
00004  This file is part of Drake.
00005 
00006  Drake is free software: you can redistribute it and/or modify
00007  it under the terms of the GNU General Public License as published by
00008  the Free Software Foundation, either version 3 of the License, or
00009  (at your option) any later version.
00010 
00011  Drake is distributed in the hope that it will be useful,
00012  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00014  GNU General Public License for more details.
00015 
00016  You should have received a copy of the GNU General Public License
00017  along with Drake. If not, see <http://www.gnu.org/licenses/>.
00018 
00019 */
00020 
00021 #include <stdio.h>
00022 #include <stdarg.h>
00023 #include <unistd.h>
00024 #include <errno.h>
00025 #include <fcntl.h>
00026 #include <time.h>
00027 #include <math.h>
00028 #include <malloc.h>
00029 #include <pthread.h>
00030 #include <string.h>
00031 #include <stdlib.h>
00032 #include <time.h>
00033 #include <execinfo.h>
00034 #include <signal.h>
00035 #include <sys/resource.h>
00036 #include <sys/time.h>
00037 
00038 #include <drake/schedule.h>
00039 
00040 #if DEBUG
00041 /* get REG_EIP from ucontext.h */
00042 //#define __USE_GNU
00043 #include <ucontext.h>
00044 #endif
00045 #include <execinfo.h>
00046 
00047 #include <pelib/integer.h>
00048 
00049 #include <drake/task.h>
00050 #include <drake/link.h>
00051 #include <drake/cross_link.h>
00052 #include <drake/processor.h>
00053 #include <drake/mapping.h>
00054 #include <drake/platform.h>
00055 #include <drake/stream.h>
00056 #include <pelib/monitor.h>
00057 
00058 // Debuggin options
00059 #if 1
00060 #define debug(var) printf("[%s:%s:%d:P%zu] %s = \"%s\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL)
00061 #define debug_addr(var) printf("[%s:%s:%d:P%zu] %s = \"%p\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL)
00062 #define debug_int(var) printf("[%s:%s:%d:P%zu] %s = \"%d\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL)
00063 #define debug_uint(var) printf("[%s:%s:%d:P%zu] %s = \"%u\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL)
00064 #define debug_luint(var) printf("[%s:%s:%d:P%zu] %s = \"%lu\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL)
00065 #define debug_size_t(var) printf("[%s:%s:%d:P%zu] %s = \"%zu\"\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), #var, var); fflush(NULL)
00066 #else
00067 #define debug(var)
00068 #define debug_addr(var)
00069 #define debug_int(var)
00070 #define debug_size_t(var)
00071 #endif
00072 
00073 #define link_name_to_int(name) strcmp((name), "left") == 0 || strcmp((name), "output") == 0 ? 0 : 1
00074 
00075 static
00076 int
00077 tasks_mapped_same_cores(task_tp t1, task_tp t2)
00078 {
00079         if(t1->width != t2->width)
00080         {
00081                 return 0;
00082         }
00083         else
00084         {
00085                 size_t i;
00086                 for(i = 0; i < t1->width; i++)
00087                 {
00088                         if(t1->core[i]->id != t2->core[i]->id)
00089                         {
00090                                 return 0;
00091                         }
00092                 }
00093                 return 1;
00094         }
00095 }
00096 
00097 static
00098 void
00099 build_link(mapping_t *mapping, processor_t *proc, task_t *prod, task_t *cons, string prod_name, string cons_name)
00100 {
00101         int i, j;
00102         link_t *link = NULL;
00103         cross_link_t *cross_link;
00104         
00105         // Find in target task if this link doesn't already exist
00106         map_iterator_t(string, link_tp) *kk;
00107         for(kk = pelib_map_begin(string, link_tp)(prod->succ); kk != pelib_map_end(string, link_tp)(prod->succ); kk = pelib_map_next(string, link_tp)(kk))
00108         {
00109                 link = pelib_map_read(string, link_tp)(kk).value;
00110                 if(link->prod->id == prod->id && link->cons->id == cons->id)
00111                 {
00112                         break;
00113                 }
00114                 else
00115                 {
00116                         link = NULL;
00117                 }
00118         }
00119 
00120         // If a link between these two tasks does not exist
00121         if(link == NULL)
00122         {
00123                 // Create and initialize a new link
00124                 link = (link_t*)drake_platform_private_malloc(sizeof(link_t));
00125                 link->prod = prod;
00126                 link->cons = cons;
00127                 link->buffer = NULL;
00128 
00129                 // Add it as source and sink to both current and target tasks
00130                 pair_t(string, link_tp) link_prod_pair, link_cons_pair;
00131                 pelib_alloc_buffer(string)(&link_prod_pair.key, (strlen(prod_name) + 1) * sizeof(char));
00132                 pelib_alloc_buffer(string)(&link_cons_pair.key, (strlen(cons_name) + 1) * sizeof(char));
00133                 pelib_copy(string)(prod_name, &link_prod_pair.key);
00134                 pelib_copy(string)(cons_name, &link_cons_pair.key);
00135                 pelib_copy(link_tp)(link, &link_prod_pair.value);
00136                 pelib_copy(link_tp)(link, &link_cons_pair.value);
00137                 pelib_map_insert(string, link_tp)(prod->succ, link_prod_pair);
00138                 pelib_map_insert(string, link_tp)(cons->pred, link_cons_pair);
00139 
00140                 // If tasks are mapped to different sets of cores
00141                 size_t i;
00142                 for(i = 0; i < (prod->width < cons->width ? prod->width : cons->width); i++)
00143                 {
00144                         if(prod->core[i] != cons->core[i])
00145                         {
00146                                 break;
00147                         }
00148                 }
00149 
00150                 // Then create a cross link between these tasks
00151                 if(prod->width != cons->width || !tasks_mapped_same_cores(prod, cons))
00152                 //if(prod->width != cons->width || i < (prod->width < cons->width ? prod->width : cons->width))
00153                 {
00154                         cross_link = (cross_link_t*)drake_platform_private_malloc(sizeof(cross_link_t));
00155                         cross_link->link = link;
00156                         cross_link->read = NULL;
00157                         cross_link->write = NULL;
00158                         cross_link->prod_state = NULL;
00159                         cross_link->total_read = 0;
00160                         cross_link->total_written = 0;
00161                         cross_link->available = 0;
00162 
00163                         // Add cross-core link to current task and processor
00164                         size_t min = pelib_array_length(cross_link_tp)(cons->core[0]->source);
00165                         size_t min_index = 0;
00166                         for(i = 1; i < cons->width; i++)
00167                         {
00168                                 if(min > pelib_array_length(cross_link_tp)(cons->core[i]->source))
00169                                 {
00170                                         min = pelib_array_length(cross_link_tp)(cons->core[i]->source);
00171                                         min_index = i;
00172                                 }
00173                         }
00174                         pelib_array_append(cross_link_tp)(cons->core[min_index]->source, cross_link);
00175                         pelib_array_append(cross_link_tp)(cons->source, cross_link);
00176 
00177                         // Add cross-core link to target task and its processor
00178                         min = pelib_array_length(cross_link_tp)(prod->core[0]->source);
00179                         min_index = 0;
00180                         for(i = 1; i < prod->width; i++)
00181                         {
00182                                 if(min > pelib_array_length(cross_link_tp)(prod->core[i]->source))
00183                                 {
00184                                         min = pelib_array_length(cross_link_tp)(prod->core[i]->source);
00185                                         min_index = i;
00186                                 }
00187                         }
00188                         pelib_array_append(cross_link_tp)(prod->core[min_index]->sink, cross_link);
00189                         pelib_array_append(cross_link_tp)(prod->sink, cross_link);
00190 
00191                 }
00192                 else
00193                 {
00194                         proc->inner_links++;
00195                 }
00196         }
00197 }
00198 
00199 static map_t(string, task_tp)*
00200 get_task_consumers(mapping_t *mapping, task_t *task)
00201 {
00202         size_t i;
00203         map_t(string, task_tp) *consumers;
00204         consumers = pelib_alloc(map_t(string, task_tp))();
00205         pelib_init(map_t(string, task_tp))(consumers);
00206         for(i = 0; i < mapping->schedule->consumers_in_task[task->id - 1]; i++)
00207         {
00208                 pair_t(string, task_tp) pair;
00209                 string name = mapping->schedule->consumers_name[task->id - 1][i];
00210                 pelib_alloc_buffer(string)(&pair.key, strlen(name) + 1);
00211                 pelib_copy(string)(name, &pair.key);
00212                 pair.value = drake_mapping_find_task(mapping, mapping->schedule->consumers_id[task->id - 1][i]);
00213                 pelib_map_insert(string, task_tp)(consumers, pair);
00214         }
00215 
00216         return consumers;       
00217 }
00218 
00219 static map_t(string, task_tp)*
00220 get_task_producers(mapping_t *mapping, task_t *task)
00221 {
00222         size_t i;
00223         map_t(string, task_tp) *producers;
00224         producers = pelib_alloc(map_t(string, task_tp))();
00225         pelib_init(map_t(string, task_tp))(producers);
00226         for(i = 0; i < mapping->schedule->producers_in_task[task->id - 1]; i++)
00227         {
00228                 pair_t(string, task_tp) pair;
00229                 string name = mapping->schedule->producers_name[task->id - 1][i];
00230                 pelib_alloc_buffer(string)(&pair.key, strlen(name) + 1);
00231                 pelib_copy(string)(name, &pair.key);
00232                 pair.value = drake_mapping_find_task(mapping, mapping->schedule->producers_id[task->id - 1][i]);
00233                 pelib_map_insert(string, task_tp)(producers, pair);
00234         }
00235 
00236         return producers;       
00237 }
00238 
00239 static
00240 void
00241 build_tree_network(mapping_t* mapping)
00242 {
00243         int i, j;
00244         task_t *target_task, *current_task;
00245         link_t *link;
00246         processor_t *proc;
00247         cross_link_t *cross_link;
00248 
00249         for(i = 0; i < mapping->processor_count; i++)
00250         {
00251                 proc = mapping->proc[i];
00252                 proc->inner_links = 0;
00253 
00254                 for(j = 0; j < mapping->proc[i]->handled_nodes; j++)
00255                 {
00256                         current_task = mapping->proc[i]->task[j];
00257 
00258                         map_t(string, task_tp) *producers = get_task_producers(mapping, current_task);
00259                         map_iterator_t(string, task_tp)* kk;
00260                         for(kk = pelib_map_begin(string, task_tp)(producers); kk != pelib_map_end(string, task_tp)(producers); kk = pelib_map_next(string, task_tp)(kk))
00261                         {
00262                                 target_task = pelib_map_read(string, task_tp)(kk).value;
00263                                 string prod_name = pelib_map_read(string, task_tp)(kk).key;
00264                                 // Get link name from the producer's point of view
00265                                 string cons_name;
00266                                 size_t l;
00267                                 for(l = 0; l < mapping->schedule->consumers_in_task[target_task->id - 1]; l++)
00268                                 {
00269                                         if(mapping->schedule->consumers_id[target_task->id - 1][l] == current_task->id)
00270                                         {
00271                                                 cons_name = mapping->schedule->consumers_name[target_task->id - 1][l];
00272                                                 break;
00273                                         }
00274                                 }
00275 
00276                                 if(target_task != NULL)
00277                                 {
00278                                         build_link(mapping, proc, target_task, current_task, prod_name, cons_name);
00279 
00280                                         size_t l;
00281                                         for(l = 0; l < current_task->width; l++)
00282                                         {
00283                                                 int index = drake_mapping_find_processor_index(mapping, current_task->core[l]->id);
00284                                                 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], current_task->id)];
00285                                                 *t = *current_task;
00286                                         }
00287 
00288                                         for(l = 0; l < target_task->width; l++)
00289                                         {
00290                                                 int index = drake_mapping_find_processor_index(mapping, target_task->core[l]->id);
00291                                                 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], target_task->id)];
00292                                                 *t = *target_task;
00293                                         }
00294                                 }
00295                         }
00296                         //pelib_destroy(map_t(string, task_tp))(*producers);
00297                         pelib_free(map_t(string, task_tp))(producers);
00298 
00299                         map_t(string, task_tp) *consumers = get_task_consumers(mapping, current_task);
00300                         for(kk = pelib_map_begin(string, task_tp)(consumers); kk != pelib_map_end(string, task_tp)(consumers); kk = pelib_map_next(string, task_tp)(kk))
00301                         {
00302                                 target_task = pelib_map_read(string, task_tp)(kk).value;
00303 
00304                                 string cons_name = pelib_map_read(string, task_tp)(kk).key;
00305                                 // Get link name from the producer's point of view
00306                                 string prod_name;
00307                                 size_t l;
00308                                 for(l = 0; l < mapping->schedule->producers_in_task[target_task->id - 1]; l++)
00309                                 {
00310                                         if(mapping->schedule->producers_id[target_task->id - 1][l] == current_task->id)
00311                                         {
00312                                                 prod_name = mapping->schedule->producers_name[target_task->id - 1][l];
00313                                                 break;
00314                                         }
00315                                 }
00316 
00317                                 if(target_task != NULL)
00318                                 {
00319                                         build_link(mapping, proc, current_task, target_task, prod_name, cons_name);
00320 
00321                                         size_t l;
00322                                         for(l = 0; l < current_task->width; l++)
00323                                         {
00324                                                 int index = drake_mapping_find_processor_index(mapping, current_task->core[l]->id);
00325                                                 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], current_task->id)];
00326                                                 *t = *current_task;
00327                                         }
00328                                         for(l = 0; l < target_task->width; l++)
00329                                         {
00330                                                 int index = drake_mapping_find_processor_index(mapping, target_task->core[l]->id);
00331                                                 task_t *t = mapping->proc[index]->task[drake_processor_find_task(mapping->proc[index], target_task->id)];
00332                                                 *t = *target_task;
00333                                         }
00334                                 }
00335                         }
00336                         //pelib_destroy(map_t(string, task_tp))(*consumers);
00337                         pelib_free(map_t(string, task_tp))(consumers);
00338                 }
00339         }
00340 }
00341 
00342 #if PRINTF_FEEDBACK || PRINTF_PUSH || PRINTF_CHECK_IN || PRINTF_CHECK_OUT
00343 static int
00344 monitor(task_t *task, cross_link_t *link)
00345 {
00346         return 0;
00347 }
00348 int printf_enabled = -1;
00349 #endif
00350 
00351 static
00352 void
00353 feedback_link(task_t *task, cross_link_t *link)
00354 {
00355         enum task_status new_state;
00356         size_t size;
00357 
00358         size = link->available - pelib_cfifo_length(int)(link->link->buffer);
00359 
00360         if(size > 0)
00361         {
00362 #if PRINTF_FEEDBACK
00363 if((printf_enabled & 1) && monitor(task, link)) {
00364 }
00365 #endif
00366                 link->total_read += size;
00367                 *link->read = link->total_read;
00368                 drake_platform_commit(link->read);
00369                 link->available = pelib_cfifo_length(int)(link->link->buffer);
00370 #if PRINTF_FEEDBACK
00371 if((printf_enabled & 1) && monitor(task, link)) {
00372 }
00373 #endif
00374         }
00375 }
00376 
00377 static
00378 void
00379 push_link(task_t *task, cross_link_t* link)
00380 {
00381         size_t length = pelib_cfifo_length(int)(link->link->buffer);
00382         size_t size = length - link->available;
00383         /*
00384         printf("[%s:%s:%d][Task %d %s] Number of elements ready to push: %zu\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, length);
00385         printf("[%s:%s:%d][Task %d %s] Number of elements to be pushed: %zu\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, size);
00386         printf("[%s:%s:%d][Task %d %s] Available elements in memory: %zu\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->available);
00387         printf("[%s:%s:%d][Task %d %s] Will push %zu elements\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, size);
00388         printf("[%s:%s:%d][Task %d %s] Writing write counter at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->write);
00389         printf("[%s:%s:%d][Task %d %s] Writing elements at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->link->buffer->buffer);
00390         */
00391 
00392         if(size > 0)
00393         {
00394 #if PRINTF_PUSH
00395 if((printf_enabled & 1) && monitor(task, link)) {
00396 }
00397 #endif
00398                 link->total_written += size;
00399                 *link->write = link->total_written;
00400                 drake_platform_commit(link->write);
00401                 link->available = pelib_cfifo_length(int)(link->link->buffer);
00402 #if PRINTF_PUSH
00403 if((printf_enabled & 2) && monitor(task, link)) {
00404 }
00405 #endif
00406         }
00407 
00408         // Also send the latest task status
00409         *link->prod_state = task->status;
00410 }
00411 
00412 static
00413 void
00414 task_commit(task_t* task)
00415 {
00416         int i;
00417 
00418         for(i = 0; i < pelib_array_length(cross_link_tp)(task->source); i++)
00419         {
00420                 feedback_link(task, pelib_array_read(cross_link_tp)(task->source, i));
00421         }
00422         for(i = 0; i < pelib_array_length(cross_link_tp)(task->sink); i++)
00423         {
00424                 push_link(task, pelib_array_read(cross_link_tp)(task->sink, i));
00425         }
00426 }
00427 
00428 static
00429 void
00430 check_input_link(task_t *task, cross_link_t *link)
00431 {
00432         enum task_status new_state;
00433         drake_platform_pull(link->write);
00434         /*
00435         printf("[%s:%s:%d][Task %d %s] Checking write counter at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->write);
00436         printf("[%s:%s:%d][Task %d %s] %zu elements written\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, *link->write);
00437         printf("[%s:%s:%d][Task %d %s] Reading elements at address %p\n", __FILE__, __FUNCTION__, __LINE__, task->id, task->name, link->link->buffer->buffer);
00438         */
00439         size_t write = *link->write - link->total_written;
00440         size_t actual_write = 0;
00441 
00442         actual_write = pelib_cfifo_fill(int)(link->link->buffer, write);
00443         link->actual_written = actual_write;
00444 
00445         if(actual_write > 0)
00446         {
00447 #if PRINTF_CHECK_IN
00448 if((printf_enabled & 4) && monitor(task, link)) {
00449 }
00450 #endif
00451                 link->available = pelib_cfifo_length(int)(link->link->buffer);
00452                 link->total_written += write;
00453 #if PRINTF_CHECK_IN
00454 if((printf_enabled & 4) && monitor(task, link)) {
00455 }
00456 #endif
00457         }
00458 
00459         new_state = *link->prod_state;
00460         if(new_state != link->link->prod->status)
00461         {
00462                 link->link->prod->status = new_state;
00463 #if PRINTF_CHECK_IN
00464                 if((printf_enabled & 1) && monitor(task, link)) {
00465                 }
00466 #endif
00467         }
00468 }
00469 
00470 static
00471 void
00472 check_output_link(task_t *task, cross_link_t *link)
00473 {
00474         enum task_status new_state;
00475         drake_platform_pull(link->read);
00476         size_t read = *link->read - link->total_read;
00477         size_t actual_read = pelib_cfifo_discard(int)(link->link->buffer, read);
00478         link->actual_read = actual_read;
00479 
00480         if(actual_read > 0)
00481         {
00482 #if PRINTF_CHECK_OUT
00483                 if((printf_enabled & 8) && monitor(task, link)) {
00484                 }
00485 #endif
00486                 // Keep track of how much was written before work
00487                 link->available = pelib_cfifo_length(int)(link->link->buffer);
00488                 link->total_read += read;
00489 #if PRINTF_CHECK_OUT
00490                 if((printf_enabled & 8) && monitor(task, link)) {
00491                 }
00492 #endif
00493         }
00494         else
00495         {
00496                 /*
00497                 if(read > 0)
00498                 {
00499                 }
00500                 */
00501         }
00502 }
00503 
00504 static
00505 void
00506 task_check(task_t *task)
00507 {
00508         int i;
00509 
00510         for(i = 0; i < pelib_array_length(cross_link_tp)(task->source); i++)
00511         {
00512                 check_input_link(task, pelib_array_read(cross_link_tp)(task->source, i));
00513         }
00514 
00515         for(i = 0; i < pelib_array_length(cross_link_tp)(task->sink); i++)
00516         {
00517                 check_output_link(task, pelib_array_read(cross_link_tp)(task->sink, i));
00518         }
00519 }
00520 
00521 static
00522 size_t
00523 buffer_size(size_t mpb_size, size_t nb_in, size_t nb_out)
00524 {
00525 #if 0
00526         size_t ret = (nb_in > 0) ? (mpb_size // total allocable MPB size
00527                 - nb_in // As many input buffer as
00528                         * (sizeof(size_t)  // Size of a write
00529                                 + sizeof(enum task_status) // State of a task
00530                         )
00531                 - nb_out // As many output buffers as
00532                         * sizeof(size_t) // Size of a read
00533                 )
00534         / nb_in : mpb_size; // Remaining space to be divided by number of input links
00535 #endif
00536 
00537         size_t ret = (mpb_size // total allocable MPB size
00538                 - nb_in // As many input buffer as
00539                         * (sizeof(size_t)  // Size of a write
00540                                 + sizeof(enum task_status) // State of a task
00541                         )
00542                 - nb_out // As many output buffers as
00543                         * sizeof(size_t) // Size of a read
00544                 )
00545         / nb_in; // Remaining space to be divided by number of input links
00546 
00547         return ret;
00548 }
00549 
00550 static
00551 void
00552 printf_mpb_allocation(size_t mpb_size, size_t nb_in, size_t nb_out)
00553 {
00554         fprintf(stderr, "MPB size: %zu\n", mpb_size);
00555         fprintf(stderr, "Input links: %zu\n", nb_in);
00556         fprintf(stderr, "Output links: %zu\n", nb_out);
00557         fprintf(stderr, "Total allocable memory per input link: %zu\n", buffer_size(mpb_size, nb_in, nb_out < 1));
00558 }
00559 
00560 static
00561 void
00562 error_small_mpb(size_t mpb_size, size_t nb_in, size_t nb_out, processor_t *proc)
00563 {
00564         fprintf(stderr, "Too much cross core links at processor %u\n", proc->id);
00565         printf_mpb_allocation(mpb_size, nb_in, nb_out);
00566         abort();
00567 }
00568 
00569 static
00570 int
00571 check_mpb_size(size_t mpb_size, size_t nb_in, size_t nb_out, processor_t *proc)
00572 {
00573         if(buffer_size(mpb_size, nb_in, nb_out) < 1)
00574         {
00575                 error_small_mpb(mpb_size, nb_in, nb_out, proc);
00576                 return 0;
00577         }
00578         else
00579         {
00580                 return 1;
00581         }
00582 }
00583 
00584 static
00585 void
00586 allocate_buffers(drake_stream_t* stream)
00587 {
00588         int i, j, k, nb, nb_in, nb_out;
00589         task_t* task;
00590         link_t *link;
00591         cross_link_t *cross_link;
00592         processor_t *proc;
00593         mapping_t *mapping = stream->mapping;
00594         /*
00595         fprintf(stderr, "[%s:%s:%d] %zX\n", __FILE__, __FUNCTION__, __LINE__, stream->stack);
00596         fprintf(stderr, "[%s:%s:%d] %zX\n", __FILE__, __FUNCTION__, __LINE__, stream->stack->base_ptr);
00597         fprintf(stderr, "[%s:%s:%d] %zX\n", __FILE__, __FUNCTION__, __LINE__, stream->stack->stack_ptr);
00598         */
00599 
00600         for(i = 0; i < mapping->processor_count; i++)
00601         {
00602                 proc = mapping->proc[i];
00603 
00604                 for(j = 0; j < proc->handled_nodes; j++)
00605                 {
00606                         task = proc->task[j];
00607 
00608                         // Take care of all successor links
00609                         map_iterator_t(string, link_tp)* kk;
00610                         for(kk = pelib_map_begin(string, link_tp)(task->succ); kk != pelib_map_end(string, link_tp)(task->succ); kk = pelib_map_next(string, link_tp)(kk))
00611                         {
00612                                 link = pelib_map_read(string, link_tp)(kk).value;
00613                                 // If this is not the root task
00614                                 if(link->cons != NULL)
00615                                 {
00616                                         // If the task is mapped to the same core: inner link
00617                                         if(tasks_mapped_same_cores(task, link->cons))
00618                                         {
00619                                                 if(link->buffer == NULL)
00620                                                 {
00621                                                         size_t capacity = drake_platform_shared_size() / proc->inner_links / sizeof(int);
00622                                                         link->buffer = pelib_alloc_collection(cfifo_t(int))(capacity);
00623                                                         pelib_init(cfifo_t(int))(link->buffer);
00624                                                 }
00625                                         }
00626                                         else
00627                                         {
00628                                                 size_t l;
00629                                                 for(l = 0; l < link->cons->width; l++)
00630                                                 {
00631                                                         size_t m;
00632                                                         for(m = 0; m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source); m++)
00633                                                         {
00634                                                                 if(pelib_array_read(cross_link_tp)(link->cons->core[l]->source, m)->link->prod->id == task->id)
00635                                                                 {
00636                                                                         break;
00637                                                                 }
00638                                                         }
00639 
00640                                                         if(m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source))
00641                                                         {
00642                                                                 break;
00643                                                         }
00644                                                 }
00645                                                 if(l == link->cons->width)
00646                                                 {
00647                                                         fprintf(stderr, "[%s:%d:P%zu] Something has gone terribly wrong here\n", __FILE__, __LINE__, drake_platform_core_id());
00648                                                         abort();
00649                                                 }
00650                                                 size_t nb_in_succ = pelib_array_length(cross_link_tp)(link->cons->core[l]->source);
00651                                                 size_t nb_out_succ = pelib_array_length(cross_link_tp)(link->cons->core[l]->sink);
00652                                                 check_mpb_size(stream->local_memory_size, nb_in_succ, nb_out_succ, proc);
00653 
00654                                                 // If the task is mapped to another core: output link
00655                                                 if(link->buffer == NULL)
00656                                                 {
00657                                                         // Perform this allocation manually
00658                                                         link->buffer = pelib_alloc_struct(cfifo_t(int))();
00659                                                         int core = link->cons->core[l]->id;
00660                                                         size_t capacity = buffer_size(stream->local_memory_size, nb_in_succ, nb_out_succ) / sizeof(int);
00661                                                         capacity = capacity - (capacity % drake_platform_shared_align());
00662                                                         link->buffer->buffer = (int*)drake_platform_shared_malloc(sizeof(int) * capacity, core);
00663                                                         link->buffer->capacity = capacity;
00664                                                         pelib_init(cfifo_t(int))(link->buffer);
00665                                                 }
00666                                         }
00667                                 }
00668                         }
00669 
00670                         // Take care of all predecessor links
00671                         for(kk = pelib_map_begin(string, link_tp)(task->pred); kk != pelib_map_end(string, link_tp)(task->pred); kk = pelib_map_next(string, link_tp)(kk))
00672                         {
00673                                 link = pelib_map_read(string, link_tp)(kk).value;
00674 
00675                                 // If this is not the root task
00676                                 if(link->prod != NULL)
00677                                 {
00678                                         // If the task is mapped to the same core: inner link
00679                                         if(tasks_mapped_same_cores(task, link->prod))
00680                                         {
00681                                                 if(link->buffer == NULL)
00682                                                 {
00683                                                         size_t capacity = drake_platform_shared_size() / proc->inner_links / sizeof(int);
00684                                                         link->buffer = pelib_alloc_collection(cfifo_t(int))(capacity);
00685                                                         pelib_init(cfifo_t(int))(link->buffer);
00686                                                 }
00687                                         }
00688                                         else
00689                                         {
00690                                                 size_t l;
00691                                                 for(l = 0; l < link->cons->width; l++)
00692                                                 {
00693                                                         size_t m;
00694                                                         for(m = 0; m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source); m++)
00695                                                         {
00696                                                                 if(pelib_array_read(cross_link_tp)(link->cons->core[l]->source, m)->link->prod->id == link->prod->id)
00697                                                                 {
00698                                                                         break;
00699                                                                 }
00700                                                         }
00701 
00702                                                         if(m < pelib_array_length(cross_link_tp)(link->cons->core[l]->source))
00703                                                         {
00704                                                                 break;
00705                                                         }
00706                                                 }
00707 
00708                                                 nb_in = pelib_array_length(cross_link_tp)(proc->source);
00709                                                 nb_out = pelib_array_length(cross_link_tp)(proc->sink);
00710                                                 check_mpb_size(stream->local_memory_size, nb_in, nb_out, proc);
00711                                                 // If the task is mapped to another core: cross link
00712                                                 if(link->buffer == NULL)
00713                                                 {
00714                                                         // Perform this allocation manually
00715                                                         link->buffer = pelib_alloc_struct(cfifo_t(int))();
00716 
00717                                                         //int core = task->core[l]->id;
00718                                                         int core = link->cons->core[l]->id;
00719                                                         size_t capacity = buffer_size(stream->local_memory_size, nb_in, nb_out) / sizeof(int);
00720                                                         capacity = capacity - (capacity % drake_platform_shared_align());
00721                                                         //link->buffer->buffer = (int*)drake_remote_addr(stack_grow(stack, sizeof(int) * capacity, core), core);
00722                                                         link->buffer->buffer = (int*)drake_platform_shared_malloc(sizeof(int) * capacity, core);
00723                                                         link->buffer->capacity = capacity;
00724                                                         pelib_init(cfifo_t(int))(link->buffer);
00725                                                 }
00726                                         }
00727                                 }
00728                         }
00729 
00730                         // Take care of all output links
00731                         for(k = 0; k < pelib_array_length(cross_link_tp)(task->sink); k++)
00732                         {
00733                                 cross_link = pelib_array_read(cross_link_tp)(task->sink, k);
00734 
00735                                 if(cross_link->read == NULL)
00736                                 {
00737                                         size_t l;
00738                                         for(l = 0; l < cross_link->link->cons->width; l++)
00739                                         {
00740                                                 size_t m;
00741                                                 for(m = 0; m < pelib_array_length(cross_link_tp)(cross_link->link->cons->core[l]->source); m++)
00742                                                 {
00743                                                         if(pelib_array_read(cross_link_tp)(cross_link->link->cons->core[l]->source, m)->link->prod->id == cross_link->link->prod->id)
00744                                                         {
00745                                                                 break;
00746                                                         }
00747                                                 }
00748 
00749                                                 if(m < pelib_array_length(cross_link_tp)(cross_link->link->cons->core[l]->source))
00750                                                 {
00751                                                         break;
00752                                                 }
00753                                         }
00754                                         if(l == cross_link->link->cons->width)
00755                                         {
00756                                                 fprintf(stderr, "[%s:%d:P%zu] Something has gone terribly wrong here\n", __FILE__, __LINE__, drake_platform_core_id());
00757                                                 abort();
00758                                         }
00759                                         cross_link->read = (volatile size_t*)drake_platform_shared_malloc_mailbox(sizeof(size_t), cross_link->link->cons->core[l]->id);
00760                                         
00761                                         *cross_link->read = 0;
00762                                 }
00763                         }
00764 
00765                         // Take care of all input links
00766                         for(k = 0; k < pelib_array_length(cross_link_tp)(task->source); k++)
00767                         {
00768                                 cross_link = pelib_array_read(cross_link_tp)(task->source, k);
00769                                 size_t l;
00770                                 for(l = 0; l < cross_link->link->prod->width; l++)
00771                                 {
00772                                         size_t m;
00773                                         for(m = 0; m < pelib_array_length(cross_link_tp)(link->prod->core[l]->sink); m++)
00774                                         {
00775                                                 if(pelib_array_read(cross_link_tp)(cross_link->link->prod->core[l]->sink, m)->link->prod->id == cross_link->link->prod->id)
00776                                                 {
00777                                                         break;
00778                                                 }
00779                                         }
00780 
00781                                         if(m < pelib_array_length(cross_link_tp)(cross_link->link->prod->core[l]->sink))
00782                                         {
00783                                                 break;
00784                                         }
00785                                 }
00786                                 if(l == cross_link->link->cons->width)
00787                                 {
00788                                         fprintf(stderr, "[%s:%d:P%zu] Something has gone terribly wrong here\n", __FILE__, __LINE__, drake_platform_core_id());
00789                                         abort();
00790                                 }
00791 
00792 
00793                                 if(cross_link->prod_state == NULL)
00794                                 {
00795                                         cross_link->prod_state = (task_status_t*)drake_platform_shared_malloc_mailbox(sizeof(enum task_status), cross_link->link->cons->core[l]->id);
00796                                         *cross_link->prod_state = TASK_START;
00797                                 }
00798 
00799                                 if(cross_link->write == NULL)
00800                                 {
00801                                         cross_link->write = (volatile size_t*)drake_platform_shared_malloc_mailbox(sizeof(size_t), cross_link->link->cons->core[l]->id);
00802                                         *cross_link->write = 0;
00803                                 }
00804                         }
00805                 }
00806         }
00807 }
00808 
00809 #if MONITOR_EXCEPTIONS
00810 
00811 task_t *current = NULL;
00812 struct sigaction oldact[5];
00813 int signal_counter = 0;
00814 void
00815 bt_sighandler(int sig, siginfo_t *info, void *secret)
00816 {
00817         // Restores normal signal catching
00818         sigaction(SIGSEGV, &oldact[0], NULL);
00819         sigaction(SIGUSR1, &oldact[1], NULL);
00820         sigaction(SIGINT, &oldact[2], NULL);
00821         sigaction(SIGFPE, &oldact[3], NULL);
00822         sigaction(SIGTERM, &oldact[4], NULL);
00823 
00824         void *array[10];
00825         size_t size;
00826         char **strings;
00827         size_t i;
00828 
00829         size = backtrace (array, 10);
00830         strings = backtrace_symbols (array, size);
00831 
00832         if(drake_platform_core_id() == 0)
00833         {
00834         printf("[%s:%s:%d:P%zu] Caught signal %d\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), sig);
00835         switch(sig)
00836         {
00837                 case SIGFPE:
00838                 {
00839                         printf("[%s:%s:%d:P%zu] Caught floating-point exception (SIGFPE)\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id());
00840                 }
00841                 break;
00842                 case SIGSEGV:
00843                 {
00844                         printf("[%s:%s:%d:P%zu] Caught segmentation fault exception (SIGSEGV)\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id());
00845                 }
00846                 break;
00847                 default:
00848                 {
00849                         printf("[%s:%s:%d:P%zu] Caught unknown signal: %d\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), sig);
00850                 }
00851                 break;
00852         }
00853         
00854         printf("[%s:%s:%d:P%zu] Obtained %zd stack frames.\n", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id(), size);
00855         printf("[%s:%s:%d:P%zu] ", __FILE__, __FUNCTION__, __LINE__, drake_platform_core_id());
00856 
00857         for (i = 0; i < size; i++)
00858         {
00859                 printf("%s ", strings[i]);
00860         }
00861         printf("\n");
00862         }
00863 
00864         abort();
00865 }
00866 #endif
00867 
00868 static mapping_t*
00869 prepare_mapping(drake_schedule_t *schedule)
00870 {
00871         size_t i, j;
00872         mapping_t *mapping;
00873         processor_t *processor = NULL;
00874 
00875         size_t num_cores = schedule->core_number;
00876         mapping = pelib_alloc_collection(mapping_t)(num_cores);
00877         mapping->schedule = schedule;
00878 
00879         for(j = 1; j <= schedule->core_number; j++)
00880         {
00881                 size_t tasks_in_core = schedule->tasks_in_core[j - 1];
00882                 processor = pelib_alloc_collection(processor_t)(tasks_in_core);
00883                 processor->id = j - 1;
00884                 size_t producers_in_core = schedule->producers_in_core[j - 1];
00885                 size_t consumers_in_core = schedule->consumers_in_core[j - 1];
00886                 processor->source = pelib_alloc_collection(array_t(cross_link_tp))(producers_in_core);
00887                 processor->sink = pelib_alloc_collection(array_t(cross_link_tp))(consumers_in_core);
00888                 drake_mapping_insert_processor(mapping, processor);
00889         }
00890 
00891         size_t task_counter = 0;
00892         task_t *tasks = malloc(sizeof(task_t) * schedule->task_number); 
00893         for(j = 1; j <= schedule->core_number; j++)
00894         {
00895                 for(i = 1; i <= schedule->tasks_in_core[j - 1]; i++)
00896                 {
00897                         // Check if this that has been already mapped or not
00898                         size_t k;
00899                         for(k = 0; k < task_counter; k++)
00900                         {
00901                                 if(tasks[k].id == schedule->schedule[j - 1][i - 1].id)
00902                                 {
00903                                         break;
00904                                 }
00905                         }
00906                         if(k == task_counter)
00907                         {
00908                                 // This is a new task
00909                                 task_t task;
00910                                 task.id = schedule->schedule[j - 1][i - 1].id;
00911                                 task.name = schedule->task_name[task.id - 1];
00912                                 task.core = malloc(sizeof(processor_t*));
00913                                 task.core[0] = mapping->proc[j - 1];
00914                                 task.width = 1;
00915                                 tasks[task_counter] = task;
00916                                 task_counter++;
00917 
00918                         }
00919                         else
00920                         {
00921                                 // We update an existing task
00922                                 processor_t **list = tasks[k].core;
00923                                 tasks[k].core = malloc(sizeof(processor_t*) * (tasks[k].width + 1));
00924                                 memcpy(tasks[k].core, list, sizeof(processor_t*) * tasks[k].width);
00925                                 tasks[k].core[tasks[k].width] = mapping->proc[j - 1];
00926                                 tasks[k].width++;
00927                                 free(list);
00928                         }
00929                 }
00930         }
00931         for(j = 1; j <= schedule->core_number; j++)
00932         {
00933                 for(i = 1; i <= schedule->tasks_in_core[j - 1]; i++)
00934                 {
00935                         size_t k;
00936                         task_t task;
00937                         for(k = 0; k < task_counter; k++)
00938                         {
00939                                 if(tasks[k].id == schedule->schedule[j - 1][i - 1].id)
00940                                 {
00941                                         task = tasks[k];
00942                                         break;
00943                                 }
00944                         }
00945                         if(k == task_counter)
00946                         {
00947                                 fprintf(stderr, "Catastrophic error at %s:%d: trying to create a task that is not in schedule. This is non-sense, aborting.\n", __FILE__, __LINE__);
00948                                 abort();
00949                         }
00950                         task.name = schedule->task_name[task.id - 1];
00951                         task.workload = schedule->task_workload[task.id - 1];
00952                         task.frequency = schedule->schedule[j - 1][i - 1].frequency;
00953                         size_t producers_in_task = schedule->producers_in_task[task.id - 1];
00954                         size_t consumers_in_task = schedule->consumers_in_task[task.id - 1];
00955                         size_t remote_producers_in_task = schedule->remote_producers_in_task[task.id - 1];
00956                         size_t remote_consumers_in_task = schedule->remote_consumers_in_task[task.id - 1];
00957                         task.pred = pelib_alloc(map_t(string, link_tp))();
00958                         task.succ = pelib_alloc(map_t(string, link_tp))();
00959                         pelib_init(map_t(string, link_tp))(task.pred);
00960                         pelib_init(map_t(string, link_tp))(task.succ);
00961                         task.source = pelib_alloc_collection(array_t(cross_link_tp))(remote_producers_in_task);
00962                         task.sink = pelib_alloc_collection(array_t(cross_link_tp))(remote_consumers_in_task);
00963 
00964                         task.status = TASK_START;
00965                         drake_mapping_insert_task(mapping, j - 1, &task);
00966                 }
00967         }
00968         free(tasks);
00969 
00970         return mapping;
00971 }
00972 
00973 drake_stream_t
00974 drake_stream_create_explicit(void (*schedule_init)(drake_schedule_t*), void (*schedule_destroy)(drake_schedule_t*), void* (*task_function)(size_t id, task_status_t status), drake_platform_t pt)
00975 {
00976         drake_stream_t stream;
00977         int k;
00978         char* outputfile;
00979         array_t(int) *array = NULL;
00980 
00981         // Initialize functions
00982         stream.schedule_destroy = schedule_destroy;
00983         mapping_t *mapping;
00984         int i, j, max_nodes;
00985         int rcce_id;
00986 
00987         processor_t *proc;
00988         task_t *task;
00989         link_t* link;
00990 
00991         unsigned long long int start, stop, begin, end;
00992         schedule_init(&stream.schedule);
00993         if(stream.schedule.core_number != drake_platform_core_size())
00994         {
00995                 fprintf(stderr, "Application compiled for different number of cores (%zu) than available on this platform (%zu). Please recompile drake application with a correct platform description\n", stream.schedule.core_number, drake_platform_core_size());
00996                 abort();
00997         }
00998         drake_schedule_t *schedule = &stream.schedule;
00999         drake_platform_barrier(NULL);
01000         mapping = prepare_mapping(schedule);
01001 
01002         // Build task network based on tasks' id
01003         build_tree_network(mapping);
01004         size_t stuff1 = drake_platform_core_id();
01005         int stuff2 = drake_mapping_find_processor_index(mapping, stuff1);
01006         proc = mapping->proc[stuff2];
01007 
01008         // Initialize task's pointer
01009         max_nodes = (proc == NULL ? 0 : proc->handled_nodes);
01010         for(i = 0; i < max_nodes; i++)
01011         {
01012                 task_t *task = proc->task[i];
01013                 task->status = TASK_START;
01014                 task->init = (int (*)(task_t*, void*))task_function(task->id, TASK_INIT);
01015                 task->start = (int (*)(task_t*))task_function(task->id, TASK_START);
01016                 task->run = (int (*)(task_t*))task_function(task->id, TASK_RUN);
01017                 task->destroy = (int (*)(task_t*))task_function(task->id, TASK_DESTROY);
01018                 task->kill = (int (*)(task_t*))task_function(task->id, TASK_KILLED);
01019         }
01020 
01021         stream.mapping = mapping;
01022         stream.proc = proc;
01023         stream.local_memory_size = drake_platform_shared_size();
01024         stream.stage_start_time = drake_platform_time_alloc();
01025         stream.stage_stop_time = drake_platform_time_alloc();
01026         stream.stage_sleep_time = drake_platform_time_alloc();
01027         stream.stage_time = drake_platform_time_alloc();
01028         drake_platform_time_init(stream.stage_time, schedule->stage_time);
01029         stream.zero = drake_platform_time_alloc();
01030         drake_platform_time_init(stream.zero, 0);
01031 
01032         stream.platform = pt;
01033 
01034         return stream;
01035 }
01036 
01037 int
01038 drake_stream_init(drake_stream_t *stream, void *aux)
01039 {
01040         int success = 1;
01041         size_t i;
01042 
01043         if(stream->proc != NULL && stream->proc->handled_nodes > 0)
01044         {
01045                 size_t max_nodes = stream->proc->handled_nodes;
01046                 allocate_buffers(stream);
01047                 for(i = 0; i < max_nodes; i++)
01048                 {
01049                         task_t *task = stream->proc->task[i];
01050                         int run = task->init(task, aux);
01051                         //int run = 1;
01052                         success = success && run;               
01053                 }
01054                 drake_platform_barrier(NULL);
01055         }
01056         else
01057         {
01058                 // Deactivate core is no task is to be run
01059                 drake_platform_core_disable(stream->platform, drake_platform_core_id());
01060                 drake_platform_barrier(NULL);
01061         }
01062 
01063         return success;
01064 }
01065 
01066 int
01067 drake_stream_destroy(drake_stream_t* stream)
01068 {
01069         // Run the destroy method for each task
01070         task_t *task;
01071         int i;
01072         int success;
01073         mapping_t *mapping = stream->mapping;
01074         processor_t *proc = stream->proc;
01075         for(i = 0; proc != NULL && i < proc->handled_nodes; i++)
01076         {
01077                 task = proc->task[i];
01078                 task->status = TASK_DESTROY;
01079                 // /!\ Do not invert the operand of the boolean expression below
01080                 // Or Drake will not run a destroy method of a task if a previous
01081                 // task did not return 1 when destroyed.
01082                 success = task->destroy(task) && success;
01083         }
01084 
01085         // Free the stream data structures
01086         stream->schedule_destroy(&stream->schedule);
01087         free(stream->stage_start_time);
01088         free(stream->stage_stop_time);
01089         free(stream->stage_sleep_time);
01090         free(stream->stage_time);
01091         free(stream->zero);
01092 
01093         // TODO: deallocate buffers, if core had any task to run
01094 
01095         return success;
01096 }
01097 
01098 int
01099 drake_stream_run(drake_stream_t* stream)
01100 {
01101         unsigned long long int start, stop;
01102         size_t i, j;
01103         task_t *task;
01104         mapping_t *mapping = stream->mapping;
01105         processor_t *proc = stream->proc;
01106 
01107         if(proc == NULL)
01108         {
01109                 return 0;
01110         }
01111 
01112         int active_tasks = proc->handled_nodes;
01113         unsigned long long int begin, end, obegin, oend;
01114 
01115         // Make sure everyone starts at the same time
01116         time_t timeref = time(NULL);
01117         int done = 0;
01118 
01119 #if MONITOR_EXCEPTIONS
01120         /* Install our signal handler */
01121         struct sigaction sa;
01122 
01123         sa.sa_sigaction = bt_sighandler;
01124         sigemptyset (&sa.sa_mask);
01125         sa.sa_flags = SA_RESTART | SA_SIGINFO;
01126 
01127         sigaction(SIGSEGV, &sa, &oldact[0]);
01128         sigaction(SIGUSR1, &sa, &oldact[1]);
01129         sigaction(SIGINT, &sa, &oldact[2]);
01130         sigaction(SIGFPE, &sa, &oldact[3]);
01131         sigaction(SIGTERM, &sa, &oldact[4]);
01132 #endif
01133 
01134         // Set frequency of first task
01135         int freq;
01136         if(proc->handled_nodes > 0)
01137         {
01138                 freq = proc->task[0]->frequency;
01139                 drake_platform_set_voltage_frequency(stream->platform, freq);
01140         }
01141 
01142         /* Phase 1, real */
01143         while(active_tasks > 0)
01144         {
01145                 // Capture the stage starting time
01146                 drake_platform_time_get(stream->stage_start_time);
01147                 for(i = 0; i < proc->handled_nodes; i++)
01148                 {
01149                         task = proc->task[i];
01150                         if(task->status < TASK_KILLED)
01151                         {
01152                                 // Switch frequency
01153                                 if(freq != task->frequency)
01154                                 {
01155                                         freq = task->frequency;
01156                                         drake_platform_set_voltage_frequency(stream->platform, freq);
01157                                 }
01158                         }
01159 
01160                         switch(task->status)
01161                         {
01162                                 // Checks input and proceeds to start when first input come
01163                                 case TASK_START:
01164                                         done = task->start(task);
01165                                         task_check(task);
01166 
01167                                         if(task->status == TASK_START && done)
01168                                         {
01169                                                 task->status = TASK_RUN;
01170                                         }
01171 
01172                                         // Commit
01173                                         task_commit(task);
01174                                 break;
01175 
01176                                 case TASK_RUN:
01177                                         // Check
01178                                         task_check(task);
01179 
01180                                         // Work
01181                                         done = task->run(task);
01182 
01183                                         // Commit
01184                                         task_commit(task);
01185 
01186                                         if(task->status == TASK_RUN && done)
01187                                         {
01188                                                 
01189                                                 task->status = TASK_KILLED;
01190                                         }
01191 
01192                                 // Decrement active tasks
01193                                 case TASK_KILLED:
01194                                         if(task->status == TASK_KILLED)
01195                                         {
01196                                                 task->kill(task);
01197                                                 task->status = TASK_ZOMBIE;
01198                                                 active_tasks--;
01199 
01200                                                 // Commit
01201                                                 task_commit(task);
01202                                         }
01203                                 break;
01204 
01205                                 // Loop until all other tasks are also done
01206                                 case TASK_ZOMBIE:
01207                                         if(task->status == TASK_ZOMBIE)
01208                                         {
01209                                                 task->status = TASK_ZOMBIE;
01210                                         }
01211                                 break;
01212 
01213                                 // Should not happen. If it does happen, let the scheduler decide what to do
01214                                 case TASK_INVALID:
01215                                         task->status = TASK_ZOMBIE;
01216                                 break;
01217 
01218                                 default:
01219                                         task->status = TASK_INVALID;
01220                                 break;
01221                         }
01222                 }
01223 
01224                 // Pause until the stage time elapses, if any
01225                 if(drake_platform_time_greater(stream->stage_time, stream->zero))
01226                 {
01227                         drake_platform_time_get(stream->stage_stop_time);
01228                         drake_platform_time_subtract(stream->stage_sleep_time, stream->stage_stop_time, stream->stage_start_time);
01229                         if(!drake_platform_time_greater(stream->stage_sleep_time, stream->stage_time))
01230                         {
01231                                 drake_platform_time_subtract(stream->stage_sleep_time, stream->stage_time, stream->stage_sleep_time);
01232                                 drake_platform_sleep(stream->stage_sleep_time);
01233                         }
01234                 }
01235         }
01236 
01237         // Get core to sleep
01238         drake_platform_sleep_enable(stream->platform, drake_platform_core_id());
01239         //drake_platform_core_disable(stream->platform, drake_platform_core_id());
01240 
01241         return 0;
01242 }
01243