7
7
* are asynchronously retrievable via NativeLib.asyncGetResult().
8
8
*
9
9
* Created by Riccardo Petrocco, Arno Bakker
10
- * Copyright 2010-2014 Delft University of Technology. All rights reserved.
10
+ * Copyright 2010-2016 Delft University of Technology. All rights reserved.
11
11
*
12
12
*/
13
13
#include < stdio.h>
19
19
#include " com_tudelft_triblerdroid_swift_NativeLib.h"
20
20
#include < sstream>
21
21
#include < map>
22
+ #include < queue>
23
+
22
24
23
25
using namespace swift ;
24
26
27
+ #define ASYNC_POLL_INTERVAL (100 *TINT_MSEC)
28
+
25
29
// httpgw.cpp functions
26
30
bool InstallHTTPGateway ( struct event_base *evbase,Address bindaddr, uint32_t chunk_size, double *maxspeed, std::string storage_dir, int32_t vod_step, int32_t min_prebuf );
27
31
bool HTTPIsSending ();
@@ -30,7 +34,7 @@ std::string HttpGwStatsGetSpeedCallback(Sha1Hash swarmid);
30
34
31
35
// Local functions
32
36
// Libevent* functions are executed by Mainloop thread,
33
- void LibeventKeepaliveCallback (int fd, short event, void *arg);
37
+ void LibeventPollAsyncCallback (int fd, short event, void *arg);
34
38
void LibeventOpenCallback (int fd, short event, void *arg);
35
39
void LibeventCloseCallback (int fd, short event, void *arg);
36
40
void LibeventGetHTTPProgressCallback (int fd, short event, void *arg);
@@ -41,7 +45,7 @@ void LibeventLiveAddCallback(int fd, short event, void *arg);
41
45
bool enginestarted = false ;
42
46
uint32_t chunk_size = SWIFT_DEFAULT_CHUNK_SIZE;
43
47
double maxspeed[2 ] = {DBL_MAX,DBL_MAX};
44
- struct event evkeepalive ;
48
+ struct event evasyncpoll ;
45
49
46
50
// for Live
47
51
LiveTransfer *livesource_lt = NULL ;
@@ -53,6 +57,7 @@ class AsyncParams
53
57
{
54
58
public:
55
59
int callid_;
60
+ event_callback_fn func_;
56
61
Sha1Hash swarmid_;
57
62
Address tracker_;
58
63
std::string filename_;
@@ -61,26 +66,26 @@ class AsyncParams
61
66
bool removestate_;
62
67
bool removecontent_;
63
68
64
- AsyncParams (Sha1Hash &swarmid, Address &tracker, std::string filename) :
65
- callid_ (-1 ), swarmid_(swarmid), tracker_(tracker), filename_(filename),
69
+ AsyncParams (event_callback_fn func, Sha1Hash &swarmid, Address &tracker, std::string filename) :
70
+ callid_ (-1 ), func_(func), swarmid_(swarmid), tracker_(tracker), filename_(filename),
66
71
data_ (NULL ), datalen_(-1 ), removestate_(false ), removecontent_(false )
67
72
{
68
73
}
69
74
70
- AsyncParams (Sha1Hash &swarmid) :
71
- callid_ (-1 ), swarmid_(swarmid), tracker_(" " ), filename_(" " ),
75
+ AsyncParams (event_callback_fn func, Sha1Hash &swarmid) :
76
+ callid_ (-1 ), func_(func), swarmid_(swarmid), tracker_(" " ), filename_(" " ),
72
77
data_ (NULL ), datalen_(-1 ), removestate_(false ), removecontent_(false )
73
78
{
74
79
}
75
80
76
- AsyncParams (char *data, int datalen) :
77
- callid_ (-1 ), swarmid_(Sha1Hash::ZERO), tracker_(" " ), filename_(" " ),
81
+ AsyncParams (event_callback_fn func, char *data, int datalen) :
82
+ callid_ (-1 ), func_(func), swarmid_(Sha1Hash::ZERO), tracker_(" " ), filename_(" " ),
78
83
data_ (data), datalen_(datalen), removestate_(false ), removecontent_(false )
79
84
{
80
85
}
81
86
82
- AsyncParams (Sha1Hash &swarmid, bool removestate, bool removecontent) :
83
- callid_ (-1 ), swarmid_(swarmid), tracker_(" " ), filename_(" " ),
87
+ AsyncParams (event_callback_fn func, Sha1Hash &swarmid, bool removestate, bool removecontent) :
88
+ callid_ (-1 ), func_(func), swarmid_(swarmid), tracker_(" " ), filename_(" " ),
84
89
data_ (NULL ), datalen_(-1 ), removestate_(removestate), removecontent_(removecontent)
85
90
{
86
91
}
@@ -93,10 +98,12 @@ class AsyncParams
93
98
};
94
99
95
100
101
+ typedef std::queue<AsyncParams *> asqueue_t ;
96
102
typedef std::map<int ,std::string> intstringmap_t ;
97
103
98
104
pthread_mutex_t asyncMutex = PTHREAD_MUTEX_INITIALIZER;
99
105
int asyncCallID=481 ; // protected by mutex
106
+ asqueue_t asyncReqQ; // protected by mutex
100
107
intstringmap_t asyncResMap; // protected by mutex
101
108
102
109
@@ -150,10 +157,11 @@ JNIEXPORT jstring JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_Init(JNI
150
157
}
151
158
}
152
159
153
- // Arno: always have some timer running. Otherwise in some cases libevent
154
- // won't execute any evtimer events added later.
155
- evtimer_assign (&evkeepalive, Channel::evbase, LibeventKeepaliveCallback, NULL );
156
- evtimer_add (&evkeepalive, tint2tv (TINT_SEC));
160
+ // Arno: as libevent is used single threaded the only way to coordinate
161
+ // the Java calling thread and the libevent processing thread is to
162
+ // let the latter poll.
163
+ evtimer_assign (&evasyncpoll, Channel::evbase, LibeventPollAsyncCallback, NULL );
164
+ evtimer_add (&evasyncpoll, tint2tv (ASYNC_POLL_INTERVAL));
157
165
158
166
// Start HTTP gateway, if requested
159
167
if (errorstr == " " && httpgwaddr!=Address ())
@@ -177,15 +185,6 @@ JNIEXPORT jstring JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_Init(JNI
177
185
}
178
186
179
187
180
-
181
- void LibeventKeepaliveCallback (int fd, short event, void *arg)
182
- {
183
- // Called every second to keep libevent timer processing alive?!
184
- evtimer_add (&evkeepalive, tint2tv (TINT_SEC));
185
- }
186
-
187
-
188
-
189
188
JNIEXPORT void JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_Mainloop (JNIEnv * env, jobject obj)
190
189
{
191
190
// Enter libevent mainloop
@@ -210,7 +209,7 @@ JNIEXPORT void JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_Shutdown(JN
210
209
/* *
211
210
* Allocates a callid for an asynchronous call and schedules it.
212
211
*/
213
- int AsyncRegisterCallback (event_callback_fn func, AsyncParams *aptr)
212
+ int AsyncRegisterCallback (AsyncParams *aptr)
214
213
{
215
214
int prc = pthread_mutex_lock (&asyncMutex);
216
215
if (prc != 0 )
@@ -221,6 +220,8 @@ int AsyncRegisterCallback(event_callback_fn func, AsyncParams *aptr)
221
220
222
221
aptr->callid_ = asyncCallID;
223
222
asyncCallID++;
223
+ asyncReqQ.push (aptr);
224
+ int retCallID = aptr->callid_ ;
224
225
225
226
prc = pthread_mutex_unlock (&asyncMutex);
226
227
if (prc != 0 )
@@ -229,34 +230,55 @@ int AsyncRegisterCallback(event_callback_fn func, AsyncParams *aptr)
229
230
return -1 ;
230
231
}
231
232
232
- // Call timer
233
- struct event *evtimerptr = new struct event ;
234
- evtimer_assign (evtimerptr,Channel::evbase,func,aptr);
235
- evtimer_add (evtimerptr,tint2tv (0 ));
236
-
237
- return aptr->callid_ ;
233
+ return retCallID;
238
234
}
239
235
240
- /* *
241
- * Sets the result of the asynchronous call identified by callid
236
+
237
+ /*
238
+ * Called every ASYNC_POLL_INTERVAL by Libevent thread to perform actual
239
+ * swift calls.
242
240
*/
243
- void AsyncSetResult (int callid, std::string result )
241
+ void LibeventPollAsyncCallback (int fd, short event, void *arg )
244
242
{
245
243
int prc = pthread_mutex_lock (&asyncMutex);
246
244
if (prc != 0 )
247
245
{
248
- dprintf (" NativeLib::AsyncSetResult : mutex_lock failed\n " );
246
+ dprintf (" NativeLib::LibeventPollAsync : mutex_lock failed\n " );
249
247
return ;
250
248
}
251
249
252
- asyncResMap[callid] = result;
250
+ while (!asyncReqQ.empty ())
251
+ {
252
+ AsyncParams *aptr = asyncReqQ.front ();
253
+ asyncReqQ.pop ();
254
+
255
+ // Make callback
256
+ aptr->func_ (fd,event,aptr);
257
+
258
+ delete aptr;
259
+ }
253
260
254
261
prc = pthread_mutex_unlock (&asyncMutex);
255
262
if (prc != 0 )
256
263
{
257
- dprintf (" NativeLib::AsyncSetResult : mutex_unlock failed\n " );
264
+ dprintf (" NativeLib::LibeventPollAsync : mutex_unlock failed\n " );
258
265
return ;
259
266
}
267
+
268
+ // Schedule next poll
269
+ evtimer_add (&evasyncpoll, tint2tv (ASYNC_POLL_INTERVAL));
270
+ }
271
+
272
+
273
+
274
+
275
+ /* *
276
+ * Sets the result of the asynchronous call identified by callid
277
+ */
278
+ void AsyncSetResult (int callid, std::string result)
279
+ {
280
+ // Assumption: asyncMutex held
281
+ asyncResMap[callid] = result;
260
282
}
261
283
262
284
@@ -323,10 +345,10 @@ JNIEXPORT jint JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_asyncOpen(
323
345
return -1 ; // "No destination could be determined"
324
346
325
347
Address tracker (trackercstr);
326
- AsyncParams *aptr = new AsyncParams (swarmid,tracker,dest);
348
+ AsyncParams *aptr = new AsyncParams (&LibeventOpenCallback, swarmid,tracker,dest);
327
349
328
350
// Register callback
329
- int callid = AsyncRegisterCallback (&LibeventOpenCallback, aptr);
351
+ int callid = AsyncRegisterCallback (aptr);
330
352
331
353
(env)->ReleaseStringUTFChars (jswarmid, swarmidcstr); // release jstring
332
354
(env)->ReleaseStringUTFChars (jtracker, trackercstr); // release jstring
@@ -357,8 +379,6 @@ void LibeventOpenCallback(int fd, short event, void *arg)
357
379
358
380
// Register result
359
381
AsyncSetResult (aptr->callid_ ,errorstr);
360
-
361
- delete aptr;
362
382
}
363
383
364
384
@@ -378,10 +398,10 @@ JNIEXPORT jint JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_asyncClose(
378
398
Sha1Hash swarmid = Sha1Hash (true ,swarmidcstr);
379
399
bool rs = (bool )jremovestate;
380
400
bool rc = (bool )jremovecontent;
381
- AsyncParams *aptr = new AsyncParams (swarmid,rs,rc);
401
+ AsyncParams *aptr = new AsyncParams (&LibeventCloseCallback, swarmid,rs,rc);
382
402
383
403
// Register callback
384
- int callid = AsyncRegisterCallback (&LibeventCloseCallback, aptr);
404
+ int callid = AsyncRegisterCallback (aptr);
385
405
386
406
(env)->ReleaseStringUTFChars (jswarmid, swarmidcstr); // release jstring
387
407
@@ -411,8 +431,6 @@ void LibeventCloseCallback(int fd, short event, void *arg)
411
431
412
432
// Register result
413
433
AsyncSetResult (aptr->callid_ ,errorstr);
414
-
415
- delete aptr;
416
434
}
417
435
418
436
@@ -470,10 +488,10 @@ JNIEXPORT jint JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_asyncGetHTT
470
488
const char * swarmidcstr = (env)->GetStringUTFChars (jswarmid, &blnIsCopy);
471
489
Sha1Hash swarmid = Sha1Hash (true ,swarmidcstr);
472
490
473
- AsyncParams *aptr = new AsyncParams (swarmid);
491
+ AsyncParams *aptr = new AsyncParams (&LibeventGetHTTPProgressCallback, swarmid);
474
492
475
493
// Register callback
476
- int callid = AsyncRegisterCallback (&LibeventGetHTTPProgressCallback, aptr);
494
+ int callid = AsyncRegisterCallback (aptr);
477
495
478
496
(env)->ReleaseStringUTFChars (jswarmid , swarmidcstr); // release jstring
479
497
@@ -493,8 +511,6 @@ void LibeventGetHTTPProgressCallback(int fd, short event, void *arg)
493
511
494
512
// Register result
495
513
AsyncSetResult (aptr->callid_ ,errorstr);
496
-
497
- delete aptr;
498
514
}
499
515
500
516
@@ -509,10 +525,10 @@ JNIEXPORT jint JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_asyncGetSta
509
525
const char * swarmidcstr = (env)->GetStringUTFChars (jswarmid, &blnIsCopy);
510
526
Sha1Hash swarmid = Sha1Hash (true ,swarmidcstr);
511
527
512
- AsyncParams *aptr = new AsyncParams (swarmid);
528
+ AsyncParams *aptr = new AsyncParams (&LibeventGetStatsCallback, swarmid);
513
529
514
530
// Register callback
515
- int callid = AsyncRegisterCallback (&LibeventGetStatsCallback, aptr);
531
+ int callid = AsyncRegisterCallback (aptr);
516
532
517
533
(env)->ReleaseStringUTFChars (jswarmid , swarmidcstr); // release jstring
518
534
@@ -533,8 +549,6 @@ void LibeventGetStatsCallback(int fd, short event, void *arg)
533
549
534
550
// Register result
535
551
AsyncSetResult (aptr->callid_ ,errorstr);
536
-
537
- delete aptr;
538
552
}
539
553
540
554
@@ -594,7 +608,7 @@ JNIEXPORT jstring JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_LiveAdd(
594
608
595
609
char *data = (char *)b;
596
610
int datalen = (int )dataLength;
597
- dprintf (" NativeLib::LiveAdd: Got %p bytes %d from java\n " , data, datalen );
611
+ // dprintf("NativeLib::LiveAdd: Got %p bytes %d from java\n", data, datalen );
598
612
599
613
if (data != NULL && datalen > 0 )
600
614
{
@@ -603,17 +617,19 @@ JNIEXPORT jstring JNICALL Java_com_tudelft_triblerdroid_swift_NativeLib_LiveAdd(
603
617
char *copydata = new char [datalen];
604
618
memcpy (copydata,data,datalen);
605
619
606
- AsyncParams *aptr = new AsyncParams (copydata,datalen);
620
+ AsyncParams *aptr = new AsyncParams (&LibeventLiveAddCallback, copydata,datalen);
607
621
608
622
// Register callback
609
- (void )AsyncRegisterCallback (&LibeventLiveAddCallback, aptr);
623
+ (void )AsyncRegisterCallback (aptr);
610
624
}
611
625
612
626
env->ReleaseByteArrayElements (dataArray, b, JNI_ABORT);
613
627
614
628
return env->NewStringUTF (" " );
615
629
}
616
630
631
+
632
+
617
633
/*
618
634
* Add live data to libevent evbuffer, to be turned into chunks when >=chunk_size
619
635
* has been added.
@@ -641,8 +657,6 @@ void LibeventLiveAddCallback(int fd, short event, void *arg)
641
657
if (ret < 0 )
642
658
print_error (" live: create: error evbuffer_drain" );
643
659
}
644
-
645
- delete aptr;
646
660
}
647
661
648
662
0 commit comments