Skip to content

Commit 9a016e2

Browse files
committed
修复了线程池中BUG;修复了iocp及win32消息事件引擎的BUG
1 parent b20e0b6 commit 9a016e2

File tree

13 files changed

+146
-58
lines changed

13 files changed

+146
-58
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ endif
4242
##############################################################################
4343

4444
.PHONY = check help all clean install uninstall uninstall_all build_bin build_src
45-
VERSION = 3.0.11
45+
VERSION = 3.0.17
4646

4747
help:
4848
@(echo "usage: make help|all|clean|install|uninstall|uninstall_all|build_bin|build_src")

changes.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
�޸���ʷ�б���
22
------------------------------------------------------------------------
3+
67) 2014.2.21 --- acl 3.0.17 �汾������(���̳߳ؿ�������� BUG�����Է����˽����汾)
34
66) 2014.2.17 --- acl 3.0.16 �汾������
45
65) 2014.1.25 --- acl 3.0.15 �汾������
56
64) 2014.1.11

lib_acl/changes.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
�޸���ʷ�б���
22

3+
------------------------------------------------------------------------
4+
427) 2014.2.21
5+
427.1) bugfix: events_wmsg.c ���ڻص�������û�н� stream ����Դ��ݣ����Ӱ��
6+
���� win32 ��Ϣ�¼����첽 IO ����
7+
427.2) bugfix: events_iocp.c �������ر���ɶ˿�ʱ������δ��״̬����Ӧ��ǰ�ͷ�
8+
�ص��ṹ����(http://support.microsoft.com/kb/192800/zh-cn)
9+
427.3) bugfix: acl_pthread_pool.c �߳��������������IJ�����������ָ��Խ������
10+
311
------------------------------------------------------------------------
412

513
426) 2014.2.17

lib_acl/lib_acl.rc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ END
5353
//
5454

5555
VS_VERSION_INFO VERSIONINFO
56-
FILEVERSION 2,1,2,9
57-
PRODUCTVERSION 2,1,2,9
56+
FILEVERSION 3,0,1,7
57+
PRODUCTVERSION 3,0,1,7
5858
FILEFLAGSMASK 0x17L
5959
#ifdef _DEBUG
6060
FILEFLAGS 0x1L
@@ -71,12 +71,12 @@ BEGIN
7171
BEGIN
7272
VALUE "Comments", "����Ϊ��ƽ̨��C�⣬����������ͨѶ����������ܵȹ���"
7373
VALUE "FileDescription", "acl ��"
74-
VALUE "FileVersion", "2, 1, 2, 9"
74+
VALUE "FileVersion", "3, 0, 1, 7"
7575
VALUE "InternalName", "lib_acl"
7676
VALUE "LegalCopyright", "zsx (C) 2011"
7777
VALUE "OriginalFilename", "lib_acl.lib"
7878
VALUE "ProductName", " acl ��"
79-
VALUE "ProductVersion", "2, 1, 2, 9"
79+
VALUE "ProductVersion", "3, 0, l, 7"
8080
END
8181
END
8282
BLOCK "VarFileInfo"

lib_acl/src/event/events_iocp.c

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ struct IOCP_EVENT {
5050
int type;
5151
#define IOCP_EVENT_READ (1 << 0)
5252
#define IOCP_EVENT_WRITE (1 << 2)
53+
#define IOCP_EVENT_DEAD (1 << 3)
54+
5355
ACL_EVENT_FDTABLE *fdp;
5456

5557
#define ACCEPT_ADDRESS_LENGTH ((sizeof(struct sockaddr_in) + 16))
@@ -73,20 +75,44 @@ static void stream_on_close(ACL_VSTREAM *stream, void *arg)
7375
}
7476

7577
/* 必须在释放 fdp->event_read/fdp->event_write 前关闭套接口句柄 */
76-
77-
if (ACL_VSTREAM_SOCK(stream) != ACL_SOCKET_INVALID && stream->close_fn)
78+
shutdown(ACL_VSTREAM_SOCK(stream), 0);
79+
shutdown(ACL_VSTREAM_SOCK(stream), 1);
80+
if (ACL_VSTREAM_SOCK(stream) != ACL_SOCKET_INVALID
81+
&& stream->close_fn)
82+
{
7883
(void) stream->close_fn(ACL_VSTREAM_SOCK(stream));
79-
else if (ACL_VSTREAM_FILE(stream) != ACL_FILE_INVALID && stream->fclose_fn)
84+
} else if (ACL_VSTREAM_FILE(stream) != ACL_FILE_INVALID
85+
&& stream->fclose_fn)
86+
{
8087
(void) stream->fclose_fn(ACL_VSTREAM_FILE(stream));
88+
}
89+
8190
ACL_VSTREAM_SOCK(stream) = ACL_SOCKET_INVALID;
8291
ACL_VSTREAM_FILE(stream) = ACL_FILE_INVALID;
8392

8493
if (fdp->event_read) {
85-
acl_myfree(fdp->event_read);
94+
/* 如果完成端口处于未决状态,则不能释放重叠结构,需在主循环的
95+
* GetQueuedCompletionStatus 调用后来释放
96+
*/
97+
if (HasOverlappedIoCompleted(&fdp->event_read->overlapped))
98+
acl_myfree(fdp->event_read);
99+
else {
100+
fdp->event_read->type = IOCP_EVENT_DEAD;
101+
fdp->event_read->fdp = NULL;
102+
}
86103
fdp->event_read = NULL;
87104
}
88105
if (fdp->event_write) {
89-
acl_myfree(fdp->event_write);
106+
/* 如果完成端口处于未决状态,则不能释放重叠结构,需在主循环的
107+
* GetQueuedCompletionStatus 调用后来释放
108+
*/
109+
if (HasOverlappedIoCompleted(&fdp->event_write->overlapped))
110+
acl_myfree(fdp->event_write);
111+
else {
112+
fdp->event_write->type = IOCP_EVENT_DEAD;
113+
fdp->event_write->fdp = NULL;
114+
}
115+
90116
fdp->event_write = NULL;
91117
}
92118

@@ -741,42 +767,64 @@ static void event_loop(ACL_EVENT *eventp)
741767
DWORD lastError = 0;
742768
IOCP_EVENT *iocp_event = NULL;
743769

744-
isSuccess = GetQueuedCompletionStatus(ev->h_iocp, &bytesTransferred,
745-
(DWORD*) &fdp, (OVERLAPPED**) &iocp_event, delay);
770+
isSuccess = GetQueuedCompletionStatus(ev->h_iocp,
771+
&bytesTransferred, (DWORD*) &fdp,
772+
(OVERLAPPED**) &iocp_event, delay);
773+
746774
if (!isSuccess) {
747775
if (iocp_event == NULL)
748776
break;
749-
if (!(fdp->event_type & (ACL_EVENT_XCPT | ACL_EVENT_RW_TIMEOUT))) {
777+
if (iocp_event->type == IOCP_EVENT_DEAD)
778+
acl_myfree(iocp_event);
779+
else if (iocp_event->fdp == NULL) {
780+
acl_msg_warn("%s(%d): fdp null",
781+
myname, __LINE__);
782+
acl_myfree(iocp_event);
783+
} else if (iocp_event->fdp != fdp)
784+
acl_msg_fatal("%s(%d): invalid fdp",
785+
myname, __LINE__);
786+
else if (!(fdp->event_type & (ACL_EVENT_XCPT
787+
| ACL_EVENT_RW_TIMEOUT)))
788+
{
750789
fdp->event_type |= ACL_EVENT_XCPT;
751790
fdp->fdidx_ready = eventp->fdcnt_ready;
752-
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
791+
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
792+
eventp->fdcnt_ready++;
753793
}
754794
continue;
755795
}
756796

757797
acl_assert(fdp == iocp_event->fdp);
758798

759-
if ((fdp->event_type & (ACL_EVENT_XCPT | ACL_EVENT_RW_TIMEOUT)))
799+
if ((fdp->event_type & (ACL_EVENT_XCPT
800+
| ACL_EVENT_RW_TIMEOUT)))
801+
{
760802
continue;
803+
}
804+
761805
if (iocp_event->type == IOCP_EVENT_READ) {
762806
acl_assert(fdp->event_read == iocp_event);
763807
iocp_event->type &= ~IOCP_EVENT_READ;
764808
fdp->stream->sys_read_ready = 1;
765-
if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0)
809+
if ((fdp->event_type & (ACL_EVENT_READ
810+
| ACL_EVENT_WRITE)) == 0)
766811
{
767812
fdp->event_type |= ACL_EVENT_READ;
768813
fdp->fdidx_ready = eventp->fdcnt_ready;
769-
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
814+
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
815+
eventp->fdcnt_ready++;
770816
}
771817
}
772818
if (iocp_event->type == IOCP_EVENT_WRITE) {
773819
acl_assert(fdp->event_write == iocp_event);
774820
iocp_event->type &= ~IOCP_EVENT_WRITE;
775-
if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0)
821+
if ((fdp->event_type & (ACL_EVENT_READ
822+
| ACL_EVENT_WRITE)) == 0)
776823
{
777824
fdp->event_type |= ACL_EVENT_WRITE;
778825
fdp->fdidx_ready = eventp->fdcnt_ready;
779-
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
826+
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
827+
eventp->fdcnt_ready++;
780828
}
781829
}
782830
delay = 0;

lib_acl/src/event/events_wmsg.c

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -481,10 +481,10 @@ static void handleClose(EVENT_WMSG *ev, ACL_SOCKET sockfd)
481481
return;
482482
else if (fdp->r_callback)
483483
fdp->r_callback(ACL_EVENT_XCPT, &ev->event,
484-
NULL, fdp->r_context);
484+
fdp->stream, fdp->r_context);
485485
else if (fdp->w_callback)
486486
fdp->w_callback(ACL_EVENT_XCPT, &ev->event,
487-
NULL, fdp->w_context);
487+
fdp->stream, fdp->w_context);
488488
/*
489489
else
490490
acl_msg_error("%s(%d): w_callback and r_callback null"
@@ -506,7 +506,7 @@ static void handleConnect(EVENT_WMSG *ev, ACL_SOCKET sockfd)
506506
else {
507507
fdp->stream->flag &= ~ACL_VSTREAM_FLAG_CONNECTING;
508508
fdp->w_callback(ACL_EVENT_WRITE, &ev->event,
509-
NULL, fdp->w_context);
509+
fdp->stream, fdp->w_context);
510510
}
511511
}
512512

@@ -520,7 +520,8 @@ static void handleAccept(EVENT_WMSG *ev, ACL_SOCKET sockfd)
520520
else if (fdp->r_callback == NULL)
521521
acl_msg_fatal("%s(%d): fdp callback null", myname, __LINE__);
522522

523-
fdp->r_callback(ACL_EVENT_READ, &ev->event, NULL, fdp->r_context);
523+
fdp->r_callback(ACL_EVENT_READ, &ev->event,
524+
fdp->stream, fdp->r_context);
524525
}
525526

526527
static void handleRead(EVENT_WMSG *ev, ACL_SOCKET sockfd)
@@ -532,15 +533,15 @@ static void handleRead(EVENT_WMSG *ev, ACL_SOCKET sockfd)
532533
acl_msg_error("%s(%d): fdp null for sockfd(%d)",
533534
myname, __LINE__, (int) sockfd);
534535
else if ((fdp->stream->type & ACL_VSTREAM_TYPE_LISTEN))
535-
fdp->r_callback(ACL_EVENT_READ, &ev->event, NULL,
536-
fdp->r_context);
536+
fdp->r_callback(ACL_EVENT_READ, &ev->event,
537+
fdp->stream, fdp->r_context);
537538
else if (fdp->r_callback != NULL) {
538539
/* 该描述字可读则设置 ACL_VSTREAM 的系统可读标志从而触发
539540
* ACL_VSTREAM 流在读时调用系统的 read 函数
540541
*/
541542
fdp->stream->sys_read_ready = 1;
542543
fdp->r_callback(ACL_EVENT_READ, &ev->event,
543-
NULL, fdp->r_context);
544+
fdp->stream, fdp->r_context);
544545
}
545546
/* else
546547
acl_msg_error("%s(%d): fdp->r_callback null for sockfd(%d)",
@@ -560,7 +561,7 @@ static void handleWrite(EVENT_WMSG *ev, ACL_SOCKET sockfd)
560561
handleConnect(ev, sockfd);
561562
else if (fdp->w_callback != NULL)
562563
fdp->w_callback(ACL_EVENT_WRITE, &ev->event,
563-
NULL, fdp->w_context);
564+
fdp->stream, fdp->w_context);
564565
/*
565566
else
566567
acl_msg_error("%s(%d): fdp->w_callback null for sockfd(%d)",

lib_acl/src/init/acl_init.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
#include "init.h"
2626

27-
static char *version = "lib_acl_3.0.16";
27+
static char *version = "lib_acl_3.0.17";
2828

2929
const char *acl_version(void)
3030
{

lib_acl/src/thread/acl_pthread_pool.c

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ struct acl_pthread_pool_t {
8383
thread_worker *thr_first; /* first idle thread */
8484
thread_worker *thr_iter; /* for bat operation */
8585
thread_cond *cond_first;
86-
thread_cond *cond_last;
8786
int poller_running; /* is poller thread running ? */
8887
int qlen; /* the work queue's length */
8988
int job_nslot;
@@ -193,7 +192,7 @@ static thread_cond *thread_cond_create(void)
193192
thread_cond *cond = (thread_cond*)
194193
acl_mycalloc(1, sizeof(thread_cond));
195194

196-
acl_pthread_cond_init(&cond->cond, NULL);
195+
acl_assert(acl_pthread_cond_init(&cond->cond, NULL) == 0);
197196
return cond;
198197
}
199198

@@ -207,7 +206,6 @@ static thread_worker *worker_create(acl_pthread_pool_t *thr_pool)
207206
{
208207
thread_worker *thr = (thread_worker*) acl_mycalloc(1,
209208
sizeof(thread_worker));
210-
thread_cond *cond = thr_pool->cond_first;
211209

212210
thr->id = (unsigned long) acl_pthread_self();
213211
thr->idle = thr_pool->idle_timeout;
@@ -222,27 +220,20 @@ static thread_worker *worker_create(acl_pthread_pool_t *thr_pool)
222220
} else
223221
thr->idle = 0;
224222

225-
if (cond == NULL) {
226-
cond = thread_cond_create();
227-
acl_assert(acl_pthread_cond_init(&cond->cond, NULL) == 0);
228-
} else {
229-
thr_pool->cond_first = cond->next;
230-
if (thr_pool->cond_last == cond)
231-
thr_pool->cond_last = NULL;
232-
}
223+
if (thr_pool->cond_first != NULL) {
224+
thr->cond = thr_pool->cond_first;
225+
thr_pool->cond_first = thr_pool->cond_first->next;
226+
} else
227+
thr->cond = thread_cond_create();
233228

234-
thr->cond = cond;
235229
thr->mutex = &thr_pool->worker_mutex;
236230
return thr;
237231
}
238232

239233
static void worker_free(acl_pthread_pool_t *thr_pool, thread_worker *thr)
240234
{
241-
if (thr_pool->cond_first == NULL)
242-
thr_pool->cond_first = thr->cond;
243-
else
244-
thr_pool->cond_last->next = thr->cond;
245-
thr_pool->cond_last = thr->cond;
235+
thr->cond->next = thr_pool->cond_first;
236+
thr_pool->cond_first = thr->cond;
246237
acl_myfree(thr);
247238
}
248239

@@ -431,19 +422,18 @@ static void *worker_thread(void* arg)
431422
}
432423
}
433424

434-
thr = worker_create(thr_pool);
435-
acl_assert(thr->mutex == &thr_pool->worker_mutex);
436-
mutex = thr->mutex;
437-
438425
/* lock the thread pool's global mutex at first */
439426

440-
status = acl_pthread_mutex_lock(mutex);
427+
status = acl_pthread_mutex_lock(&thr_pool->worker_mutex);
441428
if (status != 0) {
442429
SET_ERRNO(status);
443430
acl_msg_fatal("%s(%d), %s: lock failed: %s", __FILE__,
444431
__LINE__, myname, acl_last_serror());
445432
}
446433

434+
thr = worker_create(thr_pool);
435+
mutex = thr->mutex;
436+
447437
for (;;) {
448438

449439
/* handle thread self's job first */
@@ -498,7 +488,7 @@ static void *worker_thread(void* arg)
498488
status = acl_pthread_mutex_unlock(mutex);
499489
if (status != 0) {
500490
SET_ERRNO(status);
501-
acl_msg_error("%s, %s(%d): unlock error(%s)",
491+
acl_msg_fatal("%s, %s(%d): unlock error(%s)",
502492
__FILE__, myname, __LINE__, acl_last_serror());
503493
}
504494

@@ -943,7 +933,6 @@ static void thread_pool_init(acl_pthread_pool_t *thr_pool)
943933
thr_pool->schedule_warn = 100;
944934
thr_pool->schedule_wait = 100;
945935
thr_pool->cond_first = NULL;
946-
thr_pool->cond_last = NULL;
947936
}
948937

949938
/* create work queue */

lib_acl_cpp/samples/aio/aio_server/main.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ class io_callback : public aio_callback
195195
*/
196196
bool timeout_callback()
197197
{
198-
std::cout << "Timeout ..." << std::endl;
199-
return (true);
198+
std::cout << "Timeout, delete it ..." << std::endl;
199+
return (false);
200200
}
201201

202202
private:
@@ -239,7 +239,7 @@ class io_accept_callback : public aio_accept_callback
239239
client->add_timeout_callback(callback);
240240

241241
// 从异步流读一行数据
242-
client->gets(10, false);
242+
client->gets(3, false);
243243
return (true);
244244
}
245245
};
@@ -251,7 +251,7 @@ static void usage(const char* procname)
251251

252252
int main(int argc, char* argv[])
253253
{
254-
bool use_kernel = true;
254+
bool use_kernel = false;
255255
int ch;
256256

257257
while ((ch = getopt(argc, argv, "hk")) > 0)

0 commit comments

Comments
 (0)