Skip to content

Commit b6ed3ed

Browse files
committed
add thread pool implemention
1 parent 687d64d commit b6ed3ed

File tree

3 files changed

+198
-2
lines changed

3 files changed

+198
-2
lines changed

Coding/Linux_OS_ThreadPool.h

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
#ifndef THREAD_POOL_H
2+
#define THREAD_POOL_H
3+
4+
#include <stdio.h>
5+
#include <stdlib.h>
6+
#include <unistd.h>
7+
#include <sys/types.h>
8+
#include <pthread.h>
9+
#include <assert.h>
10+
11+
12+
// 线程池里所有运行和等待的任务都是一个CThread_worker, 由于所有任务都在链表里,所以是一个链表结构
13+
typedef struct worker
14+
{
15+
16+
void *(*process) (void *arg); // 回调函数,任务运行时会调用此函数,注意也可声明成其它形式
17+
void *arg; // 回调函数的参数
18+
struct worker *next;
19+
} CThread_worker;
20+
21+
// 线程池结构
22+
typedef struct
23+
{
24+
pthread_mutex_t queue_lock;
25+
pthread_cond_t queue_ready;
26+
CThread_worker *queue_head; // 链表结构,线程池中所有等待任务
27+
bool shutdown; // 是否销毁线程池
28+
pthread_t *threadid; // 维持线程链表
29+
int max_thread_num; // 线程池中允许的活动线程数目
30+
int cur_queue_size; // 当前等待队列的任务数目
31+
} CThread_pool;
32+
33+
int pool_add_worker (void *(*process) (void *arg), void *arg);
34+
void *thread_routine (void *arg);
35+
static CThread_pool *pool = NULL;
36+
37+
void pool_init (int max_thread_num)
38+
{
39+
pool = (CThread_pool *) malloc (sizeof (CThread_pool));
40+
pthread_mutex_init (&(pool->queue_lock), NULL);
41+
pthread_cond_init (&(pool->queue_ready), NULL);
42+
pool->queue_head = NULL;
43+
pool->max_thread_num = max_thread_num;
44+
pool->cur_queue_size = 0;
45+
pool->shutdown = false;
46+
pool->threadid =
47+
(pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
48+
int i = 0;
49+
for (i = 0; i < max_thread_num; i++)
50+
{
51+
pthread_create (&(pool->threadid[i]), NULL, thread_routine,
52+
NULL);
53+
}
54+
}
55+
56+
//向线程池中加入任务
57+
int pool_add_worker (void *(*process) (void *arg), void *arg)
58+
{
59+
// 构造一个新任务
60+
CThread_worker *new_worker =
61+
(CThread_worker *) malloc (sizeof (CThread_worker));
62+
new_worker->process = process;
63+
new_worker->arg = arg;
64+
new_worker->next = NULL; // new_worker 为最后一个任务
65+
pthread_mutex_lock (&(pool->queue_lock));
66+
67+
// 将任务加入到等待队列中
68+
CThread_worker *member = pool->queue_head;
69+
if (member != NULL)
70+
{
71+
while (member->next != NULL)
72+
member = member->next;
73+
member->next = new_worker;
74+
}
75+
else
76+
{
77+
pool->queue_head = new_worker;
78+
}
79+
assert (pool->queue_head != NULL);
80+
pool->cur_queue_size++;
81+
pthread_mutex_unlock (&(pool->queue_lock));
82+
// 等待队列中有任务了,唤醒一个等待线程. 注意如果所有线程都在忙碌,这句没有任何作用
83+
pthread_cond_signal(&(pool->queue_ready));
84+
return 0;
85+
}
86+
87+
/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直
88+
把任务运行完后再退出*/
89+
int pool_destroy ()
90+
{
91+
if (pool->shutdown)
92+
return -1; // 防止两次调用
93+
pool->shutdown = true;
94+
pthread_cond_broadcast (&(pool->queue_ready)); // 唤醒所有等待线程,线程池要销毁了
95+
// 阻塞等待线程退出,否则就成僵尸了
96+
int i;
97+
for (i = 0; i < pool->max_thread_num; i++)
98+
pthread_join (pool->threadid[i], NULL);
99+
free (pool->threadid);
100+
101+
// 销毁等待任务队列
102+
CThread_worker *head = NULL;
103+
while (pool->queue_head != NULL)
104+
{
105+
head = pool->queue_head;
106+
pool->queue_head = pool->queue_head->next;
107+
free (head);
108+
}
109+
// 销毁条件变量和互斥量
110+
pthread_mutex_destroy(&(pool->queue_lock));
111+
pthread_cond_destroy(&(pool->queue_ready));
112+
113+
free (pool);
114+
// 销毁后指针置空
115+
pool=NULL;
116+
return 0;
117+
}
118+
119+
void *thread_routine (void *arg)
120+
{
121+
printf ("starting thread 0x%x\n", pthread_self());
122+
while (1)
123+
{
124+
pthread_mutex_lock(&(pool->queue_lock));
125+
/*
126+
* 如果等待队列为0并且不销毁线程池,则处于阻塞状态;
127+
* 注意pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁
128+
*/
129+
while (pool->cur_queue_size == 0 && !pool->shutdown)
130+
{
131+
printf ("thread 0x%x is waiting\n", pthread_self ());
132+
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
133+
}
134+
// 将要销毁线程池
135+
if (pool->shutdown)
136+
{
137+
// 遇到break,continue,return等跳转语句,千万不要忘记先解锁
138+
pthread_mutex_unlock(&(pool->queue_lock));
139+
printf ("thread 0x%x is exiting\n", pthread_self());
140+
pthread_exit(NULL);
141+
}
142+
printf ("thread 0x%x is starting to work\n", pthread_self());
143+
// assert是调试的好帮手
144+
assert (pool->cur_queue_size != 0);
145+
assert (pool->queue_head != NULL);
146+
147+
/*等待队列长度减去1,并取出链表中的头元素*/
148+
pool->cur_queue_size--;
149+
CThread_worker *worker = pool->queue_head;
150+
pool->queue_head = worker->next;
151+
pthread_mutex_unlock (&(pool->queue_lock));
152+
// 调用回调函数,执行任务
153+
(*(worker->process)) (worker->arg);
154+
free (worker);
155+
worker = NULL;
156+
}
157+
}
158+
159+
#endif

Coding/Linux_OS_ThreadPoolTest.cpp

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#include <iostream>
2+
#include "Linux_OS_ThreadPool.h"
3+
using namespace std;
4+
5+
void *my_task (void *arg)
6+
{
7+
printf ("threadid is 0x%x, working on task %d\n", pthread_self(),*(int *) arg);
8+
sleep (3); //休息3秒,延长任务的执行时间
9+
return NULL;
10+
}
11+
12+
int main (int argc, char **argv)
13+
{
14+
pool_init(3); //线程池中最多三个活动线程
15+
16+
int job_num = 5;
17+
// 每隔一定时间, 向池中投入任务, 一共 5 个任务
18+
int *jobs = (int *) malloc (sizeof (int) * job_num);
19+
int i;
20+
for (i = 0; i < job_num; i++)
21+
{
22+
jobs[i] = i;
23+
sleep(i);
24+
pool_add_worker(my_task, &jobs[i]);
25+
}
26+
// 等待所有任务完成
27+
sleep(10);
28+
// 销毁线程池
29+
pool_destroy();
30+
free(jobs);
31+
return 0;
32+
}

Linux_OS/ThreadPools.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,19 @@ while(1)
5353
5454
当然,如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。以上面例子来说,如果一个医生每天只能看一个病人,那么就没有必要坐诊了。
5555
56-
要实现一个可伸缩的线程池,那么至少要包括:**n个执行任务的线程,一个任务队列,一个管理线程**,然后按照下面的方式工作:
56+
要实现一个简单的线程池,只需要设计**n个执行任务的线程,一个任务队列**,然后按照下面的方式工作:
5757
5858
1. 预先启动一些线程,线程负责执行任务队列中的任务,当队列空时,线程挂起。
5959
2. 调用的时候,直接往任务队列添加任务,并发信号通知线程队列非空。
60-
3. 管理线程负责监控任务队列和系统中的线程状态,当任务队列为空,线程数目多且很多处于空闲的时候,便通知一些线程退出以节约系统资源;当任务队列排队任务多且线程都在忙,便负责再多启动一些线程来执行任务,以确保任务执行效率。
6160
61+
要实现一个可伸缩的线程池,还需要添加一个管理线程,来负责监控任务队列和系统中的线程状态,当任务队列为空,线程数目多且很多处于空闲的时候,便通知一些线程退出以节约系统资源;当任务队列排队任务多且线程都在忙,便负责再多启动一些线程来执行任务,以确保任务执行效率。
6262
63+
[Linux_OS_ThreadPool](../Coding/Linux_OS_ThreadPool.h) 是一个简单的 C 实现的线程池的例子,[Test.cpp](../Coding/Linux_OS_ThreadPoolTest.cpp) 是一个简单的测试例子, 其中:
6364
65+
* pool_add_worker():线程池的任务链表中加入一个任务,加入后通过调用 pthread_cond_signal(&(pool->queue_ready)) 唤醒一个出于阻塞状态的线程(如果有的话)。
66+
* pool_destroy():销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。
67+
68+
一个更加完善的 C 实现可以在 [C-Thread-Pool](https://github.com/Pithikos/C-Thread-Pool) 找到。
6469
6570
# 更多阅读
6671

0 commit comments

Comments
 (0)