From e0bbbbcab8f43318faddeb4915897936cec88918 Mon Sep 17 00:00:00 2001 From: mutantturkey Date: Wed, 20 Jun 2012 14:36:45 -0400 Subject: updated the readme to include the new utility, mask-generator. Added to mask-generator to the make file, renamed the folder for mask-generator to mask, added proper headers to remove warnings, fixed a return type and removed an unused variable called status --- fly-tools/ThreadedFilterFlyMask/thpool.c | 283 ------------------------------- 1 file changed, 283 deletions(-) delete mode 100644 fly-tools/ThreadedFilterFlyMask/thpool.c (limited to 'fly-tools/ThreadedFilterFlyMask/thpool.c') diff --git a/fly-tools/ThreadedFilterFlyMask/thpool.c b/fly-tools/ThreadedFilterFlyMask/thpool.c deleted file mode 100644 index 5d7cbe3..0000000 --- a/fly-tools/ThreadedFilterFlyMask/thpool.c +++ /dev/null @@ -1,283 +0,0 @@ -/* ******************************** - * - * Author: Johan Hanssen Seferidis - * Date: 12/08/2011 - * Update: 01/11/2011 - * License: LGPL - * - * - *//** @file thpool.h *//* - ********************************/ - -/* Library providing a threading pool where you can add work. For an example on - * usage you refer to the main file found in the same package */ - -/* - * Fast reminders: - * - * tp = threadpool - * thpool = threadpool - * thpool_t = threadpool type - * tp_p = threadpool pointer - * sem = semaphore - * xN = x can be any string. N stands for amount - * - * */ - -#include -#include -#include -#include -#include - -#include "thpool.h" /* here you can also find the interface to each function */ - - -static int thpool_keepalive=1; - -/* Create mutex variable */ -pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */ - - - - -/* Initialise thread pool */ -thpool_t* thpool_init(int threadsN){ - thpool_t* tp_p; - - if (!threadsN || threadsN<1) threadsN=1; - - /* Make new thread pool */ - tp_p=(thpool_t*)malloc(sizeof(thpool_t)); /* MALLOC thread pool */ - if (tp_p==NULL){ - fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n"); - return NULL; - } - tp_p->threads=(pthread_t*)malloc(threadsN*sizeof(pthread_t)); /* MALLOC thread IDs */ - if (tp_p->threads==NULL){ - fprintf(stderr, "thpool_init(): Could not allocate memory for thread IDs\n"); - return NULL; - } - tp_p->threadsN=threadsN; - - /* Initialise the job queue */ - if (thpool_jobqueue_init(tp_p)==-1){ - fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n"); - return NULL; - } - - /* Initialise semaphore*/ - tp_p->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC job queue semaphore */ - sem_init(tp_p->jobqueue->queueSem, 0, 0); /* no shared, initial value */ - - /* Make threads in pool */ - int t; - for (t=0; tthreads[t]), NULL, (void *)thpool_thread_do, (void *)tp_p); /* MALLOCS INSIDE PTHREAD HERE */ - } - - return tp_p; -} - - -/* What each individual thread is doing - * */ -/* There are two scenarios here. One is everything works as it should and second if - * the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */ -void thpool_thread_do(thpool_t* tp_p){ - - while(thpool_keepalive){ - - if (sem_wait(tp_p->jobqueue->queueSem)) {/* WAITING until there is work in the queue */ - perror("thpool_thread_do(): Waiting for semaphore"); - exit(1); - } - - if (thpool_keepalive){ - - /* Read job from queue and execute it */ - void*(*func_buff)(void* arg); - void* arg_buff; - thpool_job_t* job_p; - - pthread_mutex_lock(&mutex); /* LOCK */ - - job_p = thpool_jobqueue_peek(tp_p); - func_buff=job_p->function; - arg_buff =job_p->arg; - thpool_jobqueue_removelast(tp_p); - - pthread_mutex_unlock(&mutex); /* UNLOCK */ - - func_buff(arg_buff); /* run function */ - free(job_p); /* DEALLOC job */ - } - else - { - return; /* EXIT thread*/ - } - } - return; -} - - -/* Add work to the thread pool */ -int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){ - thpool_job_t* newJob; - - newJob=(thpool_job_t*)malloc(sizeof(thpool_job_t)); /* MALLOC job */ - if (newJob==NULL){ - fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n"); - exit(1); - } - - /* add function and argument */ - newJob->function=function_p; - newJob->arg=arg_p; - - /* add job to queue */ - pthread_mutex_lock(&mutex); /* LOCK */ - thpool_jobqueue_add(tp_p, newJob); - pthread_mutex_unlock(&mutex); /* UNLOCK */ -} - - -/* Destroy the threadpool */ -void thpool_destroy(thpool_t* tp_p){ - int t; - - while(tp_p->jobqueue->jobsN > 0) { - usleep(500000); - } - /* End each thread's infinite loop */ - thpool_keepalive=0; - - /* Awake idle threads waiting at semaphore */ - for (t=0; t<(tp_p->threadsN); t++){ - if (sem_post(tp_p->jobqueue->queueSem)){ - fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n"); - } - } - - /* Kill semaphore */ - if (sem_destroy(tp_p->jobqueue->queueSem)!=0){ - fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n"); - } - - /* Wait for threads to finish */ - for (t=0; t<(tp_p->threadsN); t++){ - pthread_join(tp_p->threads[t], NULL); - } - - thpool_jobqueue_empty(tp_p); - - /* Dealloc */ - free(tp_p->threads); /* DEALLOC threads */ - free(tp_p->jobqueue->queueSem); /* DEALLOC job queue semaphore */ - free(tp_p->jobqueue); /* DEALLOC job queue */ - free(tp_p); /* DEALLOC thread pool */ -} - - - -/* =================== JOB QUEUE OPERATIONS ===================== */ - - - -/* Initialise queue */ -int thpool_jobqueue_init(thpool_t* tp_p){ - tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC job queue */ - if (tp_p->jobqueue==NULL) return -1; - tp_p->jobqueue->tail=NULL; - tp_p->jobqueue->head=NULL; - tp_p->jobqueue->jobsN=0; - return 0; -} - - -/* Add job to queue */ -void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p){ /* remember that job prev and next point to NULL */ - - newjob_p->next=NULL; - newjob_p->prev=NULL; - - thpool_job_t *oldFirstJob; - oldFirstJob = tp_p->jobqueue->head; - - /* fix jobs' pointers */ - switch(tp_p->jobqueue->jobsN){ - - case 0: /* if there are no jobs in queue */ - tp_p->jobqueue->tail=newjob_p; - tp_p->jobqueue->head=newjob_p; - break; - - default: /* if there are already jobs in queue */ - oldFirstJob->prev=newjob_p; - newjob_p->next=oldFirstJob; - tp_p->jobqueue->head=newjob_p; - - } - - (tp_p->jobqueue->jobsN)++; /* increment amount of jobs in queue */ - sem_post(tp_p->jobqueue->queueSem); - - int sval; - sem_getvalue(tp_p->jobqueue->queueSem, &sval); -} - - -/* Remove job from queue */ -int thpool_jobqueue_removelast(thpool_t* tp_p){ - thpool_job_t *oldLastJob; - oldLastJob = tp_p->jobqueue->tail; - - /* fix jobs' pointers */ - switch(tp_p->jobqueue->jobsN){ - - case 0: /* if there are no jobs in queue */ - return -1; - break; - - case 1: /* if there is only one job in queue */ - tp_p->jobqueue->tail=NULL; - tp_p->jobqueue->head=NULL; - break; - - default: /* if there are more than one jobs in queue */ - oldLastJob->prev->next=NULL; /* the almost last item */ - tp_p->jobqueue->tail=oldLastJob->prev; - - } - - (tp_p->jobqueue->jobsN)--; - - int sval; - sem_getvalue(tp_p->jobqueue->queueSem, &sval); - return 0; -} - - -/* Get first element from queue */ -thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p){ - return tp_p->jobqueue->tail; -} - -/* Remove and deallocate all jobs in queue */ -void thpool_jobqueue_empty(thpool_t* tp_p){ - - thpool_job_t* curjob; - curjob=tp_p->jobqueue->tail; - - while(tp_p->jobqueue->jobsN){ - tp_p->jobqueue->tail=curjob->prev; - free(curjob); - curjob=tp_p->jobqueue->tail; - tp_p->jobqueue->jobsN--; - } - - /* Fix head and tail */ - tp_p->jobqueue->tail=NULL; - tp_p->jobqueue->head=NULL; -} -- cgit v1.2.3