Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opaque handles can be fully defined in thpool.h #65

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
cmake_minimum_required(VERSION 3.0.0)
project(C-Thread-Pool LANGUAGES C)

# define the DEBUG macro when building in debug mode
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DTHPOOL_DEBUG -Wall -Wextra")

# require pthreads to be available
find_package(Threads REQUIRED)

install(FILES thpool.h DESTINATION include/C-Thread-Pool)

add_library(thpool_static STATIC thpool.c)
install(TARGETS thpool_static DESTINATION lib)
set_target_properties(thpool_static PROPERTIES OUTPUT_NAME thpool)

add_library(thpool_shared SHARED thpool.c)
install(TARGETS thpool_shared DESTINATION lib)
set_target_properties(thpool_shared PROPERTIES OUTPUT_NAME thpool)
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@ If this project reduced your development time feel free to buy me a coffee.

## Run an example

The library is not precompiled so you have to compile it with your project. The thread pool
uses POSIX threads so if you compile with gcc on Linux you have to use the flag `-pthread` like this:
The library can be compiled into a static/shared library file with CMake:

```
mkdir build
cd build
cmake -DCMAKE_INSTALL_PREFIX=$INSTALL_PATH ..
make
make install
```

allowing for it to be linked against your code.

Otherwise, compile it with your project:

gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example

Expand All @@ -31,7 +42,7 @@ Then run the executable like this:

## Basic usage

1. Include the header in your source file: `#include "thpool.h"`
1. Include the header in your source file: `#include "C-Thread-Pool/thpool.h"`
2. Create a thread pool with number of threads you want: `threadpool thpool = thpool_init(4);`
3. Add work to the pool: `thpool_add_work(thpool, (void*)function_p, (void*)arg_p);`

Expand Down
73 changes: 53 additions & 20 deletions thpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*
********************************/

#define _POSIX_C_SOURCE 200809L
#define _GNU_SOURCE
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
Expand All @@ -34,6 +34,8 @@
#define err(str)
#endif

static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t thread_cond = PTHREAD_COND_INITIALIZER;
static volatile int threads_keepalive;
static volatile int threads_on_hold;

Expand Down Expand Up @@ -93,7 +95,7 @@ typedef struct thpool_{
/* ========================== PROTOTYPES ============================ */


static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
static void* thread_do(struct thread* thread_p);
static void thread_hold(int sig_id);
static void thread_destroy(struct thread* thread_p);
Expand All @@ -118,10 +120,11 @@ static void bsem_wait(struct bsem *bsem_p);


/* Initialise thread pool */
struct thpool_* thpool_init(int num_threads){

threadpool thpool_init(int num_threads){
pthread_mutex_lock(&thread_mutex);
threads_on_hold = 0;
threads_keepalive = 1;
pthread_mutex_unlock(&thread_mutex);

if (num_threads < 0){
num_threads = 0;
Expand All @@ -132,7 +135,8 @@ struct thpool_* thpool_init(int num_threads){
thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
if (thpool_p == NULL){
err("thpool_init(): Could not allocate memory for thread pool\n");
return NULL;
threadpool null = { .dat = NULL };
return null;
}
thpool_p->num_threads_alive = 0;
thpool_p->num_threads_working = 0;
Expand All @@ -141,7 +145,8 @@ struct thpool_* thpool_init(int num_threads){
if (jobqueue_init(&thpool_p->jobqueue) == -1){
err("thpool_init(): Could not allocate memory for job queue\n");
free(thpool_p);
return NULL;
threadpool null = { .dat = NULL };
return null;
}

/* Make threads in pool */
Expand All @@ -150,7 +155,8 @@ struct thpool_* thpool_init(int num_threads){
err("thpool_init(): Could not allocate memory for threads\n");
jobqueue_destroy(&thpool_p->jobqueue);
free(thpool_p);
return NULL;
threadpool null = { .dat = NULL };
return null;
}

pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
Expand All @@ -168,13 +174,15 @@ struct thpool_* thpool_init(int num_threads){
/* Wait for threads to initialize */
while (thpool_p->num_threads_alive != num_threads) {}

return thpool_p;
threadpool tp = { .dat = thpool_p };
return tp;
}


/* Add work to the thread pool */
int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
job* newjob;
int thpool_add_work(threadpool tp, void (*function_p)(void*), void* arg_p){
thpool_* thpool_p = (thpool_*)tp.dat;
job* newjob;

newjob=(struct job*)malloc(sizeof(struct job));
if (newjob==NULL){
Expand All @@ -194,7 +202,8 @@ int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){


/* Wait until all jobs have finished */
void thpool_wait(thpool_* thpool_p){
void thpool_wait(threadpool tp){
thpool_* thpool_p = (thpool_*)tp.dat;
pthread_mutex_lock(&thpool_p->thcount_lock);
while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
Expand All @@ -204,7 +213,9 @@ void thpool_wait(thpool_* thpool_p){


/* Destroy the threadpool */
void thpool_destroy(thpool_* thpool_p){
void thpool_destroy(threadpool tp){
thpool_* thpool_p = (thpool_*)tp.dat;

/* No need to destory if it's NULL */
if (thpool_p == NULL) return ;

Expand Down Expand Up @@ -243,37 +254,58 @@ void thpool_destroy(thpool_* thpool_p){


/* Pause all threads in threadpool */
void thpool_pause(thpool_* thpool_p) {
void thpool_pause(threadpool tp) {
thpool_* thpool_p = (thpool_*)tp.dat;
int n;

pthread_mutex_lock(&thread_mutex);
threads_on_hold = 1;
pthread_mutex_unlock(&thread_mutex);

for (n=0; n < thpool_p->num_threads_alive; n++){
pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
}
}


/* Resume all threads in threadpool */
void thpool_resume(thpool_* thpool_p) {
void thpool_resume(threadpool tp) {
// resuming a single threadpool hasn't been
// implemented yet, meanwhile this supresses
// the warnings
(void)thpool_p;
(void)tp;

pthread_mutex_lock(&thread_mutex);
threads_on_hold = 0;
pthread_cond_broadcast(&thread_cond);
pthread_mutex_unlock(&thread_mutex);
}


int thpool_num_threads_working(thpool_* thpool_p){
int thpool_num_threads_working(threadpool tp){
thpool_* thpool_p = (thpool_*)tp.dat;
return thpool_p->num_threads_working;
}

int thpool_thread_index(threadpool tp, pthread_t pthread) {
thpool_* thpool_p = (thpool_*)tp.dat;
int n;
const int num_threads = thpool_p->num_threads_alive;

for (n=0; n<num_threads; n++){
if (pthread_equal(pthread, thpool_p->threads[n]->pthread))
return n;
}
return -1;
}




/* ============================ THREAD ============================== */


/* Initialize a thread in the thread pool
/* Initialize a thread in the thread pool [not exported]
*
* @param thread address to the pointer of the thread to be created
* @param id id to be given to the thread
Expand All @@ -298,11 +330,12 @@ static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){

/* Sets the calling thread on hold */
static void thread_hold(int sig_id) {
(void)sig_id;
threads_on_hold = 1;
(void)sig_id;
pthread_mutex_lock(&thread_mutex);
while (threads_on_hold){
sleep(1);
pthread_cond_wait(&thread_cond, &thread_mutex);
}
pthread_mutex_unlock(&thread_mutex);
}


Expand Down
59 changes: 45 additions & 14 deletions thpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ extern "C" {
/* =================================== API ======================================= */


typedef struct thpool_* threadpool;
typedef struct { void* dat; } threadpool;


/**
Expand All @@ -32,11 +32,28 @@ typedef struct thpool_* threadpool;
*
* @param num_threads number of threads to be created in the threadpool
* @return threadpool created threadpool on success,
* NULL on error
* "null" threadpool on error (see thpool_null())
*/
threadpool thpool_init(int num_threads);



/** @file */
/**
* @brief test whether thpool_init() returned successfully
*
* @example
*
* thpool = thpool_init(4);
* if (thpool_null(thpool)) {
* fprintf(stderr, "thpool_init() failed!\n");
* ..
* }
*/

#define thpool_null(THREADPOOL) ((THREADPOOL).dat == NULL)


/**
* @brief Add work to the job queue
*
Expand All @@ -59,12 +76,12 @@ threadpool thpool_init(int num_threads);
* ..
* }
*
* @param threadpool threadpool to which the work will be added
* @param tp threadpool to which the work will be added
* @param function_p pointer to function to add as work
* @param arg_p pointer to an argument
* @return 0 on successs, -1 otherwise.
*/
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
int thpool_add_work(threadpool tp, void (*function_p)(void*), void* arg_p);


/**
Expand All @@ -91,10 +108,10 @@ int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
* puts("All added work has finished");
* ..
*
* @param threadpool the threadpool to wait for
* @param tp the threadpool to wait for
* @return nothing
*/
void thpool_wait(threadpool);
void thpool_wait(threadpool tp);


/**
Expand All @@ -115,10 +132,10 @@ void thpool_wait(threadpool);
* ..
* thpool_resume(thpool); // Let the threads start their magic
*
* @param threadpool the threadpool where the threads should be paused
* @param tp the threadpool where the threads should be paused
* @return nothing
*/
void thpool_pause(threadpool);
void thpool_pause(threadpool tp);


/**
Expand All @@ -131,10 +148,10 @@ void thpool_pause(threadpool);
* thpool_resume(thpool);
* ..
*
* @param threadpool the threadpool where the threads should be unpaused
* @param tp the threadpool where the threads should be unpaused
* @return nothing
*/
void thpool_resume(threadpool);
void thpool_resume(threadpool tp);


/**
Expand All @@ -153,10 +170,10 @@ void thpool_resume(threadpool);
* return 0;
* }
*
* @param threadpool the threadpool to destroy
* @param tp the threadpool to destroy
* @return nothing
*/
void thpool_destroy(threadpool);
void thpool_destroy(threadpool tp);


/**
Expand All @@ -174,10 +191,24 @@ void thpool_destroy(threadpool);
* return 0;
* }
*
* @param threadpool the threadpool of interest
* @param tp the threadpool
* @return integer number of threads working
*/
int thpool_num_threads_working(threadpool);
int thpool_num_threads_working(threadpool tp);


/**
* @brief return the index for a given thread-ID
*
* If we don't want to expose access to our threads, the threads themselves
* can still come to us to ask for their index, which might be useful for
* maintaining application-specific data.
*
* @param tp the threadpool
* @param pthread pthread-ID -- e.g. via pthread_self()
* @return integer index of the provided thread (or -1)
*/
int thpool_thread_index(threadpool tp, pthread_t pthread);


#ifdef __cplusplus
Expand Down