aboutsummaryrefslogtreecommitdiff
path: root/fly-tools
diff options
context:
space:
mode:
authormutantturkey <mutantturke@gmail.com>2012-06-19 11:30:39 -0400
committermutantturkey <mutantturke@gmail.com>2012-06-19 11:30:39 -0400
commitd44e5eb6c4c2c0be83358ee7db24d71605d664e8 (patch)
tree9d10e299ead022a68de52a8c91b49d595e0f7f14 /fly-tools
parentfb8e1756a6bd715f983d19a788e526383c314eab (diff)
initial commit of a threaded mask filter
Diffstat (limited to 'fly-tools')
-rw-r--r--fly-tools/ThreadedFilterFlyMask/README93
-rw-r--r--fly-tools/ThreadedFilterFlyMask/main.c108
-rw-r--r--fly-tools/ThreadedFilterFlyMask/thpool.c283
-rw-r--r--fly-tools/ThreadedFilterFlyMask/thpool.h235
4 files changed, 719 insertions, 0 deletions
diff --git a/fly-tools/ThreadedFilterFlyMask/README b/fly-tools/ThreadedFilterFlyMask/README
new file mode 100644
index 0000000..f679b03
--- /dev/null
+++ b/fly-tools/ThreadedFilterFlyMask/README
@@ -0,0 +1,93 @@
+Author: Johan Hanssen Seferidis
+Created: 2011-08-12
+Updated: 2011-11-07
+
+
+
+
+=================================== Compiling =====================================
+The library is not precompiled so you have to compile it with your project. The thread pool
+uses POSIX threads so if you compile with gcc you have to use the flag -pthread like this:
+
+ gcc main.c thpool.c -pthread -o test
+
+
+Then run the executable like this:
+
+ ./test
+
+
+
+====================================== Usage ======================================
+
+1. Make a thread pool: thpool_t* thpool;
+2. Initialise the thread pool with number
+ of threads(workers) you want: thpool=thpool_init(4);
+3. Add work to the pool: thpool_add_work(thpool, (void*)doSth, (void*)arg);
+4. Destroy pool: thpool_destroy(thpool);
+
+
+
+
+
+=============================== Threadpool Interface ===============================
+
+
+NAME
+ thpool_t* thpool_init(int num_of_threads);
+
+SYNOPSIS
+
+ #include <thpool.h>
+
+ thpool_t* thpool_init(int num_of_threads);
+
+DESCRIPTION
+
+ Initialises the threadpool. On success a threadpool structure is returned. Otherwise if memory could not be allocated NULL is returned. The argument which is the number of threads in the threadpool should be a thoughfull choice. A common suggestion is to use as many threads as the ones supported by your cpu.
+
+ Example:
+ thpool_t* myThreadpool; //First we declare a threadpool
+ myThreadpool=thpool_init(4); //then we initialise it to 4 threads
+
+
+-----------------------------------------------------------------------------------
+
+
+
+NAME
+ thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p);
+
+SYNOPSIS
+
+ #include <thpool.h>
+
+ int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p);
+
+DESCRIPTION
+
+ Adds work to the thread pool. Work is concidered an individual function with an argument. First argument is a pointer to the pool itself. The second argument is a pointer to a function and third argument is a pointer to an argument. To pass multiple arguments just use a struct. If the function you want to pass doesn't fit the parameters of this prototype, use casting. If your function or argument doesn't fit the parameters' and return's value type then you should use casting to avoid warnings from the compiler.
+
+ Example:
+ void printSth(char* str); //Prints a text on the screen
+ thpool_add_work(thpool, (void*)printSth, (void*)str); //Pay attention to the casting
+
+
+-----------------------------------------------------------------------------------
+
+
+NAME
+ void thpool_destroy(thpool_t* tp_p);
+
+SYNOPSIS
+
+ #include <thpool.h>
+
+ void thpool_destroy(thpool_t* tp_p);
+
+DESCRIPTION
+
+ This function will destroy a threadpool. If some threads are working in the pool then thpool_destroy() will wait for them to finish. Once they are finished the threadpool is deallocated releasing all resources back to the system.
+
+ Example:
+ thpool_destroy(threadpool_p); //threadpool_p being a pointer to a thpool_t
diff --git a/fly-tools/ThreadedFilterFlyMask/main.c b/fly-tools/ThreadedFilterFlyMask/main.c
new file mode 100644
index 0000000..80da5e9
--- /dev/null
+++ b/fly-tools/ThreadedFilterFlyMask/main.c
@@ -0,0 +1,108 @@
+/*
+ * This is just an example on how to use the thpool library
+ *
+ * We create a pool of 4 threads and then add 20 tasks to the pool(10 task1
+ * functions and 10 task2 functions).
+ *
+ * Task1 doesn't take any arguments. Task2 takes an integer. Task2 is used to show
+ * how to add work to the thread pool with an argument.
+ *
+ * As soon as we add the tasks to the pool, the threads will run them. One thread
+ * may run x tasks in a row so if you see as output the same thread running several
+ * tasks, it's not an error.
+ *
+ * All jobs will not be completed and in fact maybe even none will. You can add a sleep()
+ * function if you want to complete all tasks in this test file to be able and see clearer
+ * what is going on.
+ *
+ * */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <libgen.h>
+#include <wand/MagickWand.h>
+#include "thpool.h"
+
+#define ThrowWandException(wand) { char *description; ExceptionType severity; description=MagickGetException(wand,&severity); (void) fprintf(stderr,"%s %s %lu %s\n",GetMagickModule(),description); description=(char *) MagickRelinquishMemory(description); exit(-1); }
+
+MagickWand *background;
+char **global_argv;
+
+
+
+
+void convert_image(char *file) {
+
+ printf("filename: %s", file);
+ MagickWand *mask = NewMagickWand();
+ MagickBooleanType status;
+ char output_name[256];
+
+
+ if(MagickReadImage(mask, file) == MagickFalse) {
+ ThrowWandException(mask);
+ printf("could not read file: %s", file );
+ return;
+ }
+
+ /* image ops */
+ MagickResizeImage(mask,10,10,LanczosFilter,1.0);
+
+ sprintf(output_name, "%s%s", global_argv[3], basename(file));
+
+
+ status=MagickWriteImages(mask, output_name , MagickTrue);
+ if ( status == MagickFalse ) {
+ puts("write error");
+ }
+
+ mask = DestroyMagickWand(mask);
+}
+
+int main( int argc, char **argv){
+
+ // argv 1 = Background
+ // argv 2 = input list
+ // argv 3 = output folder
+
+ char *stream;
+
+ stream = "hey \n";
+
+ printf("%s", stream);
+ if (stream[strlen(stream)] == '\n') {
+ stream[strlen(stream) - 1] == '\0';
+}
+ printf("%s", stream);
+ MagickBooleanType status;
+ global_argv = argv;
+ MagickWandGenesis();
+
+ background = NewMagickWand();
+
+ status=MagickReadImage(background,argv[1]);
+ if (status == MagickFalse) {
+ puts("background could not load error");
+ exit(0);
+ }
+
+ thpool_t* threadpool; /* make a new thread pool structure */
+ threadpool=thpool_init(1); /* initialise it to 4 number of threads */
+
+ char line[256];
+ FILE *f = fopen ( argv[2], "r" );
+ if ( f != NULL ) {
+ while ( fgets ( line, sizeof line, f ) != NULL ) {
+ thpool_add_work(threadpool, (void*)convert_image, (void*)line);
+ }
+ fclose ( f );
+ }
+ else {
+ exit(0);
+ }
+
+ puts("Will kill threadpool");
+ thpool_destroy(threadpool);
+ MagickWandTerminus();
+ return 0;
+}
diff --git a/fly-tools/ThreadedFilterFlyMask/thpool.c b/fly-tools/ThreadedFilterFlyMask/thpool.c
new file mode 100644
index 0000000..5d7cbe3
--- /dev/null
+++ b/fly-tools/ThreadedFilterFlyMask/thpool.c
@@ -0,0 +1,283 @@
+/* ********************************
+ *
+ * Author: Johan Hanssen Seferidis
+ * Date: 12/08/2011
+ * Update: 01/11/2011
+ * License: LGPL
+ *
+ *
+ *//** @file thpool.h *//*
+ ********************************/
+
+/* Library providing a threading pool where you can add work. For an example on
+ * usage you refer to the main file found in the same package */
+
+/*
+ * Fast reminders:
+ *
+ * tp = threadpool
+ * thpool = threadpool
+ * thpool_t = threadpool type
+ * tp_p = threadpool pointer
+ * sem = semaphore
+ * xN = x can be any string. N stands for amount
+ *
+ * */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <semaphore.h>
+#include <errno.h>
+
+#include "thpool.h" /* here you can also find the interface to each function */
+
+
+static int thpool_keepalive=1;
+
+/* Create mutex variable */
+pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */
+
+
+
+
+/* Initialise thread pool */
+thpool_t* thpool_init(int threadsN){
+ thpool_t* tp_p;
+
+ if (!threadsN || threadsN<1) threadsN=1;
+
+ /* Make new thread pool */
+ tp_p=(thpool_t*)malloc(sizeof(thpool_t)); /* MALLOC thread pool */
+ if (tp_p==NULL){
+ fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
+ return NULL;
+ }
+ tp_p->threads=(pthread_t*)malloc(threadsN*sizeof(pthread_t)); /* MALLOC thread IDs */
+ if (tp_p->threads==NULL){
+ fprintf(stderr, "thpool_init(): Could not allocate memory for thread IDs\n");
+ return NULL;
+ }
+ tp_p->threadsN=threadsN;
+
+ /* Initialise the job queue */
+ if (thpool_jobqueue_init(tp_p)==-1){
+ fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
+ return NULL;
+ }
+
+ /* Initialise semaphore*/
+ tp_p->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC job queue semaphore */
+ sem_init(tp_p->jobqueue->queueSem, 0, 0); /* no shared, initial value */
+
+ /* Make threads in pool */
+ int t;
+ for (t=0; t<threadsN; t++){
+ printf("Created thread %d in pool \n", t);
+ pthread_create(&(tp_p->threads[t]), NULL, (void *)thpool_thread_do, (void *)tp_p); /* MALLOCS INSIDE PTHREAD HERE */
+ }
+
+ return tp_p;
+}
+
+
+/* What each individual thread is doing
+ * */
+/* There are two scenarios here. One is everything works as it should and second if
+ * the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */
+void thpool_thread_do(thpool_t* tp_p){
+
+ while(thpool_keepalive){
+
+ if (sem_wait(tp_p->jobqueue->queueSem)) {/* WAITING until there is work in the queue */
+ perror("thpool_thread_do(): Waiting for semaphore");
+ exit(1);
+ }
+
+ if (thpool_keepalive){
+
+ /* Read job from queue and execute it */
+ void*(*func_buff)(void* arg);
+ void* arg_buff;
+ thpool_job_t* job_p;
+
+ pthread_mutex_lock(&mutex); /* LOCK */
+
+ job_p = thpool_jobqueue_peek(tp_p);
+ func_buff=job_p->function;
+ arg_buff =job_p->arg;
+ thpool_jobqueue_removelast(tp_p);
+
+ pthread_mutex_unlock(&mutex); /* UNLOCK */
+
+ func_buff(arg_buff); /* run function */
+ free(job_p); /* DEALLOC job */
+ }
+ else
+ {
+ return; /* EXIT thread*/
+ }
+ }
+ return;
+}
+
+
+/* Add work to the thread pool */
+int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){
+ thpool_job_t* newJob;
+
+ newJob=(thpool_job_t*)malloc(sizeof(thpool_job_t)); /* MALLOC job */
+ if (newJob==NULL){
+ fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
+ exit(1);
+ }
+
+ /* add function and argument */
+ newJob->function=function_p;
+ newJob->arg=arg_p;
+
+ /* add job to queue */
+ pthread_mutex_lock(&mutex); /* LOCK */
+ thpool_jobqueue_add(tp_p, newJob);
+ pthread_mutex_unlock(&mutex); /* UNLOCK */
+}
+
+
+/* Destroy the threadpool */
+void thpool_destroy(thpool_t* tp_p){
+ int t;
+
+ while(tp_p->jobqueue->jobsN > 0) {
+ usleep(500000);
+ }
+ /* End each thread's infinite loop */
+ thpool_keepalive=0;
+
+ /* Awake idle threads waiting at semaphore */
+ for (t=0; t<(tp_p->threadsN); t++){
+ if (sem_post(tp_p->jobqueue->queueSem)){
+ fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n");
+ }
+ }
+
+ /* Kill semaphore */
+ if (sem_destroy(tp_p->jobqueue->queueSem)!=0){
+ fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n");
+ }
+
+ /* Wait for threads to finish */
+ for (t=0; t<(tp_p->threadsN); t++){
+ pthread_join(tp_p->threads[t], NULL);
+ }
+
+ thpool_jobqueue_empty(tp_p);
+
+ /* Dealloc */
+ free(tp_p->threads); /* DEALLOC threads */
+ free(tp_p->jobqueue->queueSem); /* DEALLOC job queue semaphore */
+ free(tp_p->jobqueue); /* DEALLOC job queue */
+ free(tp_p); /* DEALLOC thread pool */
+}
+
+
+
+/* =================== JOB QUEUE OPERATIONS ===================== */
+
+
+
+/* Initialise queue */
+int thpool_jobqueue_init(thpool_t* tp_p){
+ tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC job queue */
+ if (tp_p->jobqueue==NULL) return -1;
+ tp_p->jobqueue->tail=NULL;
+ tp_p->jobqueue->head=NULL;
+ tp_p->jobqueue->jobsN=0;
+ return 0;
+}
+
+
+/* Add job to queue */
+void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p){ /* remember that job prev and next point to NULL */
+
+ newjob_p->next=NULL;
+ newjob_p->prev=NULL;
+
+ thpool_job_t *oldFirstJob;
+ oldFirstJob = tp_p->jobqueue->head;
+
+ /* fix jobs' pointers */
+ switch(tp_p->jobqueue->jobsN){
+
+ case 0: /* if there are no jobs in queue */
+ tp_p->jobqueue->tail=newjob_p;
+ tp_p->jobqueue->head=newjob_p;
+ break;
+
+ default: /* if there are already jobs in queue */
+ oldFirstJob->prev=newjob_p;
+ newjob_p->next=oldFirstJob;
+ tp_p->jobqueue->head=newjob_p;
+
+ }
+
+ (tp_p->jobqueue->jobsN)++; /* increment amount of jobs in queue */
+ sem_post(tp_p->jobqueue->queueSem);
+
+ int sval;
+ sem_getvalue(tp_p->jobqueue->queueSem, &sval);
+}
+
+
+/* Remove job from queue */
+int thpool_jobqueue_removelast(thpool_t* tp_p){
+ thpool_job_t *oldLastJob;
+ oldLastJob = tp_p->jobqueue->tail;
+
+ /* fix jobs' pointers */
+ switch(tp_p->jobqueue->jobsN){
+
+ case 0: /* if there are no jobs in queue */
+ return -1;
+ break;
+
+ case 1: /* if there is only one job in queue */
+ tp_p->jobqueue->tail=NULL;
+ tp_p->jobqueue->head=NULL;
+ break;
+
+ default: /* if there are more than one jobs in queue */
+ oldLastJob->prev->next=NULL; /* the almost last item */
+ tp_p->jobqueue->tail=oldLastJob->prev;
+
+ }
+
+ (tp_p->jobqueue->jobsN)--;
+
+ int sval;
+ sem_getvalue(tp_p->jobqueue->queueSem, &sval);
+ return 0;
+}
+
+
+/* Get first element from queue */
+thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p){
+ return tp_p->jobqueue->tail;
+}
+
+/* Remove and deallocate all jobs in queue */
+void thpool_jobqueue_empty(thpool_t* tp_p){
+
+ thpool_job_t* curjob;
+ curjob=tp_p->jobqueue->tail;
+
+ while(tp_p->jobqueue->jobsN){
+ tp_p->jobqueue->tail=curjob->prev;
+ free(curjob);
+ curjob=tp_p->jobqueue->tail;
+ tp_p->jobqueue->jobsN--;
+ }
+
+ /* Fix head and tail */
+ tp_p->jobqueue->tail=NULL;
+ tp_p->jobqueue->head=NULL;
+}
diff --git a/fly-tools/ThreadedFilterFlyMask/thpool.h b/fly-tools/ThreadedFilterFlyMask/thpool.h
new file mode 100644
index 0000000..39228ff
--- /dev/null
+++ b/fly-tools/ThreadedFilterFlyMask/thpool.h
@@ -0,0 +1,235 @@
+/**********************************
+ * @author Johan Hanssen Seferidis
+ * @date 12/08/2011
+ * Last update: 01/11/2011
+ * License: LGPL
+ *
+ **********************************/
+
+/* Description: Library providing a threading pool where you can add work on the fly. The number
+ * of threads in the pool is adjustable when creating the pool. In most cases
+ * this should equal the number of threads supported by your cpu.
+ *
+ * For an example on how to use the threadpool, check the main.c file or just read
+ * the documentation.
+ *
+ * In this header file a detailed overview of the functions and the threadpool logical
+ * scheme is present in case tweaking of the pool is needed.
+ * */
+
+/*
+ * Fast reminders:
+ *
+ * tp = threadpool
+ * thpool = threadpool
+ * thpool_t = threadpool type
+ * tp_p = threadpool pointer
+ * sem = semaphore
+ * xN = x can be any string. N stands for amount
+ *
+ * */
+
+/* _______________________________________________________
+ * / \
+ * | JOB QUEUE | job1 | job2 | job3 | job4 | .. |
+ * | |
+ * | threadpool | thread1 | thread2 | .. |
+ * \_______________________________________________________/
+ *
+ * Description: Jobs are added to the job queue. Once a thread in the pool
+ * is idle, it is assigned with the first job from the queue(and
+ * erased from the queue). It's each thread's job to read from
+ * the queue serially(using lock) and executing each job
+ * until the queue is empty.
+ *
+ *
+ * Scheme:
+ *
+ * thpool______ jobqueue____ ______
+ * | | | | .----------->|_job0_| Newly added job
+ * | | | head------------' |_job1_|
+ * | jobqueue----------------->| | |_job2_|
+ * | | | tail------------. |__..__|
+ * |___________| |___________| '----------->|_jobn_| Job for thread to take
+ *
+ *
+ * job0________
+ * | |
+ * | function---->
+ * | |
+ * | arg------->
+ * | | job1________
+ * | next-------------->| |
+ * |___________| | |..
+ */
+
+#ifndef _THPOOL_
+
+#define _THPOOL_
+
+#include <pthread.h>
+#include <semaphore.h>
+
+
+
+/* ================================= STRUCTURES ================================================ */
+
+
+/* Individual job */
+typedef struct thpool_job_t{
+ void* (*function)(void* arg); /**< function pointer */
+ void* arg; /**< function's argument */
+ struct thpool_job_t* next; /**< pointer to next job */
+ struct thpool_job_t* prev; /**< pointer to previous job */
+}thpool_job_t;
+
+
+/* Job queue as doubly linked list */
+typedef struct thpool_jobqueue{
+ thpool_job_t *head; /**< pointer to head of queue */
+ thpool_job_t *tail; /**< pointer to tail of queue */
+ int jobsN; /**< amount of jobs in queue */
+ sem_t *queueSem; /**< semaphore(this is probably just holding the same as jobsN) */
+}thpool_jobqueue;
+
+
+/* The threadpool */
+typedef struct thpool_t{
+ pthread_t* threads; /**< pointer to threads' ID */
+ int threadsN; /**< amount of threads */
+ thpool_jobqueue* jobqueue; /**< pointer to the job queue */
+}thpool_t;
+
+
+/* Container for all things that each thread is going to need */
+typedef struct thread_data{
+ pthread_mutex_t *mutex_p;
+ thpool_t *tp_p;
+}thread_data;
+
+
+
+/* =========================== FUNCTIONS ================================================ */
+
+
+/* ----------------------- Threadpool specific --------------------------- */
+
+/**
+ * @brief Initialize threadpool
+ *
+ * Allocates memory for the threadpool, jobqueue, semaphore and fixes
+ * pointers in jobqueue.
+ *
+ * @param number of threads to be used
+ * @return threadpool struct on success,
+ * NULL on error
+ */
+thpool_t* thpool_init(int threadsN);
+
+
+/**
+ * @brief What each thread is doing
+ *
+ * In principle this is an endless loop. The only time this loop gets interuppted is once
+ * thpool_destroy() is invoked.
+ *
+ * @param threadpool to use
+ * @return nothing
+ */
+void thpool_thread_do(thpool_t* tp_p);
+
+
+/**
+ * @brief Add work to the job queue
+ *
+ * Takes an action and its argument and adds it to the threadpool's job queue.
+ * If you want to add to work a function with more than one arguments then
+ * a way to implement this is by passing a pointer to a structure.
+ *
+ * ATTENTION: You have to cast both the function and argument to not get warnings.
+ *
+ * @param threadpool to where the work will be added to
+ * @param function to add as work
+ * @param argument to the above function
+ * @return int
+ */
+int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p);
+
+
+/**
+ * @brief Destroy the threadpool
+ *
+ * This will 'kill' the threadpool and free up memory. If threads are active when this
+ * is called, they will finish what they are doing and then they will get destroyied.
+ *
+ * @param threadpool a pointer to the threadpool structure you want to destroy
+ */
+void thpool_destroy(thpool_t* tp_p);
+
+
+
+/* ------------------------- Queue specific ------------------------------ */
+
+
+/**
+ * @brief Initialize queue
+ * @param pointer to threadpool
+ * @return 0 on success,
+ * -1 on memory allocation error
+ */
+int thpool_jobqueue_init(thpool_t* tp_p);
+
+
+/**
+ * @brief Add job to queue
+ *
+ * A new job will be added to the queue. The new job MUST be allocated
+ * before passed to this function or else other functions like thpool_jobqueue_empty()
+ * will be broken.
+ *
+ * @param pointer to threadpool
+ * @param pointer to the new job(MUST BE ALLOCATED)
+ * @return nothing
+ */
+void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p);
+
+
+/**
+ * @brief Remove last job from queue.
+ *
+ * This does not free allocated memory so be sure to have peeked() \n
+ * before invoking this as else there will result lost memory pointers.
+ *
+ * @param pointer to threadpool
+ * @return 0 on success,
+ * -1 if queue is empty
+ */
+int thpool_jobqueue_removelast(thpool_t* tp_p);
+
+
+/**
+ * @brief Get last job in queue (tail)
+ *
+ * Gets the last job that is inside the queue. This will work even if the queue
+ * is empty.
+ *
+ * @param pointer to threadpool structure
+ * @return job a pointer to the last job in queue,
+ * a pointer to NULL if the queue is empty
+ */
+thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p);
+
+
+/**
+ * @brief Remove and deallocate all jobs in queue
+ *
+ * This function will deallocate all jobs in the queue and set the
+ * jobqueue to its initialization values, thus tail and head pointing
+ * to NULL and amount of jobs equal to 0.
+ *
+ * @param pointer to threadpool structure
+ * */
+void thpool_jobqueue_empty(thpool_t* tp_p);
+
+
+#endif