From d44e5eb6c4c2c0be83358ee7db24d71605d664e8 Mon Sep 17 00:00:00 2001 From: mutantturkey Date: Tue, 19 Jun 2012 11:30:39 -0400 Subject: initial commit of a threaded mask filter --- fly-tools/ThreadedFilterFlyMask/thpool.c | 283 +++++++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 fly-tools/ThreadedFilterFlyMask/thpool.c (limited to 'fly-tools/ThreadedFilterFlyMask/thpool.c') diff --git a/fly-tools/ThreadedFilterFlyMask/thpool.c b/fly-tools/ThreadedFilterFlyMask/thpool.c new file mode 100644 index 0000000..5d7cbe3 --- /dev/null +++ b/fly-tools/ThreadedFilterFlyMask/thpool.c @@ -0,0 +1,283 @@ +/* ******************************** + * + * Author: Johan Hanssen Seferidis + * Date: 12/08/2011 + * Update: 01/11/2011 + * License: LGPL + * + * + *//** @file thpool.h *//* + ********************************/ + +/* Library providing a threading pool where you can add work. For an example on + * usage you refer to the main file found in the same package */ + +/* + * Fast reminders: + * + * tp = threadpool + * thpool = threadpool + * thpool_t = threadpool type + * tp_p = threadpool pointer + * sem = semaphore + * xN = x can be any string. N stands for amount + * + * */ + +#include +#include +#include +#include +#include + +#include "thpool.h" /* here you can also find the interface to each function */ + + +static int thpool_keepalive=1; + +/* Create mutex variable */ +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */ + + + + +/* Initialise thread pool */ +thpool_t* thpool_init(int threadsN){ + thpool_t* tp_p; + + if (!threadsN || threadsN<1) threadsN=1; + + /* Make new thread pool */ + tp_p=(thpool_t*)malloc(sizeof(thpool_t)); /* MALLOC thread pool */ + if (tp_p==NULL){ + fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n"); + return NULL; + } + tp_p->threads=(pthread_t*)malloc(threadsN*sizeof(pthread_t)); /* MALLOC thread IDs */ + if (tp_p->threads==NULL){ + fprintf(stderr, "thpool_init(): Could not allocate memory for thread IDs\n"); + return NULL; + } + tp_p->threadsN=threadsN; + + /* Initialise the job queue */ + if (thpool_jobqueue_init(tp_p)==-1){ + fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n"); + return NULL; + } + + /* Initialise semaphore*/ + tp_p->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC job queue semaphore */ + sem_init(tp_p->jobqueue->queueSem, 0, 0); /* no shared, initial value */ + + /* Make threads in pool */ + int t; + for (t=0; tthreads[t]), NULL, (void *)thpool_thread_do, (void *)tp_p); /* MALLOCS INSIDE PTHREAD HERE */ + } + + return tp_p; +} + + +/* What each individual thread is doing + * */ +/* There are two scenarios here. One is everything works as it should and second if + * the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */ +void thpool_thread_do(thpool_t* tp_p){ + + while(thpool_keepalive){ + + if (sem_wait(tp_p->jobqueue->queueSem)) {/* WAITING until there is work in the queue */ + perror("thpool_thread_do(): Waiting for semaphore"); + exit(1); + } + + if (thpool_keepalive){ + + /* Read job from queue and execute it */ + void*(*func_buff)(void* arg); + void* arg_buff; + thpool_job_t* job_p; + + pthread_mutex_lock(&mutex); /* LOCK */ + + job_p = thpool_jobqueue_peek(tp_p); + func_buff=job_p->function; + arg_buff =job_p->arg; + thpool_jobqueue_removelast(tp_p); + + pthread_mutex_unlock(&mutex); /* UNLOCK */ + + func_buff(arg_buff); /* run function */ + free(job_p); /* DEALLOC job */ + } + else + { + return; /* EXIT thread*/ + } + } + return; +} + + +/* Add work to the thread pool */ +int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){ + thpool_job_t* newJob; + + newJob=(thpool_job_t*)malloc(sizeof(thpool_job_t)); /* MALLOC job */ + if (newJob==NULL){ + fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n"); + exit(1); + } + + /* add function and argument */ + newJob->function=function_p; + newJob->arg=arg_p; + + /* add job to queue */ + pthread_mutex_lock(&mutex); /* LOCK */ + thpool_jobqueue_add(tp_p, newJob); + pthread_mutex_unlock(&mutex); /* UNLOCK */ +} + + +/* Destroy the threadpool */ +void thpool_destroy(thpool_t* tp_p){ + int t; + + while(tp_p->jobqueue->jobsN > 0) { + usleep(500000); + } + /* End each thread's infinite loop */ + thpool_keepalive=0; + + /* Awake idle threads waiting at semaphore */ + for (t=0; t<(tp_p->threadsN); t++){ + if (sem_post(tp_p->jobqueue->queueSem)){ + fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n"); + } + } + + /* Kill semaphore */ + if (sem_destroy(tp_p->jobqueue->queueSem)!=0){ + fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n"); + } + + /* Wait for threads to finish */ + for (t=0; t<(tp_p->threadsN); t++){ + pthread_join(tp_p->threads[t], NULL); + } + + thpool_jobqueue_empty(tp_p); + + /* Dealloc */ + free(tp_p->threads); /* DEALLOC threads */ + free(tp_p->jobqueue->queueSem); /* DEALLOC job queue semaphore */ + free(tp_p->jobqueue); /* DEALLOC job queue */ + free(tp_p); /* DEALLOC thread pool */ +} + + + +/* =================== JOB QUEUE OPERATIONS ===================== */ + + + +/* Initialise queue */ +int thpool_jobqueue_init(thpool_t* tp_p){ + tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC job queue */ + if (tp_p->jobqueue==NULL) return -1; + tp_p->jobqueue->tail=NULL; + tp_p->jobqueue->head=NULL; + tp_p->jobqueue->jobsN=0; + return 0; +} + + +/* Add job to queue */ +void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p){ /* remember that job prev and next point to NULL */ + + newjob_p->next=NULL; + newjob_p->prev=NULL; + + thpool_job_t *oldFirstJob; + oldFirstJob = tp_p->jobqueue->head; + + /* fix jobs' pointers */ + switch(tp_p->jobqueue->jobsN){ + + case 0: /* if there are no jobs in queue */ + tp_p->jobqueue->tail=newjob_p; + tp_p->jobqueue->head=newjob_p; + break; + + default: /* if there are already jobs in queue */ + oldFirstJob->prev=newjob_p; + newjob_p->next=oldFirstJob; + tp_p->jobqueue->head=newjob_p; + + } + + (tp_p->jobqueue->jobsN)++; /* increment amount of jobs in queue */ + sem_post(tp_p->jobqueue->queueSem); + + int sval; + sem_getvalue(tp_p->jobqueue->queueSem, &sval); +} + + +/* Remove job from queue */ +int thpool_jobqueue_removelast(thpool_t* tp_p){ + thpool_job_t *oldLastJob; + oldLastJob = tp_p->jobqueue->tail; + + /* fix jobs' pointers */ + switch(tp_p->jobqueue->jobsN){ + + case 0: /* if there are no jobs in queue */ + return -1; + break; + + case 1: /* if there is only one job in queue */ + tp_p->jobqueue->tail=NULL; + tp_p->jobqueue->head=NULL; + break; + + default: /* if there are more than one jobs in queue */ + oldLastJob->prev->next=NULL; /* the almost last item */ + tp_p->jobqueue->tail=oldLastJob->prev; + + } + + (tp_p->jobqueue->jobsN)--; + + int sval; + sem_getvalue(tp_p->jobqueue->queueSem, &sval); + return 0; +} + + +/* Get first element from queue */ +thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p){ + return tp_p->jobqueue->tail; +} + +/* Remove and deallocate all jobs in queue */ +void thpool_jobqueue_empty(thpool_t* tp_p){ + + thpool_job_t* curjob; + curjob=tp_p->jobqueue->tail; + + while(tp_p->jobqueue->jobsN){ + tp_p->jobqueue->tail=curjob->prev; + free(curjob); + curjob=tp_p->jobqueue->tail; + tp_p->jobqueue->jobsN--; + } + + /* Fix head and tail */ + tp_p->jobqueue->tail=NULL; + tp_p->jobqueue->head=NULL; +} -- cgit v1.2.3