diff --git a/DESCRIPTION b/DESCRIPTION index 118184fb9..e3e8d832d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -30,6 +30,10 @@ SystemRequirements: 'libnng' >= 1.5 and 'libmbedtls' >= 2.5, or 'cmake' to compile NNG and/or Mbed TLS included in package sources Depends: R (>= 3.5) +Imports: + later +LinkingTo: + later Suggests: knitr, markdown diff --git a/NAMESPACE b/NAMESPACE index a9f83b67f..385c06dcc 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -33,6 +33,8 @@ S3method(print,recvAio) S3method(print,sendAio) S3method(print,tlsConfig) S3method(print,unresolvedValue) +S3method(promises::as.promise,recvAio) +S3method(promises::is.promising,recvAio) S3method(start,nanoDialer) S3method(start,nanoListener) export("%~>%") @@ -96,6 +98,7 @@ export(until_) export(wait) export(wait_) export(write_cert) +importFrom(later,later) importFrom(stats,start) importFrom(tools,md5sum) importFrom(utils,.DollarNames) diff --git a/R/aio.R b/R/aio.R index b0a610412..6a8119f53 100644 --- a/R/aio.R +++ b/R/aio.R @@ -118,7 +118,14 @@ recv_aio <- function(con, "integer", "logical", "numeric", "raw", "string"), timeout = NULL, n = 65536L) - data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment()) + data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(), + function() { + cb <- data$callback + if (!is.null(cb)) { + cb(data) + } + } + ) #' Receive Async and Signal a Condition #' @@ -157,7 +164,58 @@ recv_aio_signal <- function(con, "integer", "logical", "numeric", "raw", "string"), timeout = NULL, n = 65536L) - data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment()) + data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(), + function() { + cb <- data$callback + if (!is.null(cb)) { + cb(data) + } + } + ) + +#' @exportS3Method promises::is.promising +is.promising.recvAio <- function(x) { + TRUE +} + +#' @exportS3Method promises::as.promise +as.promise.recvAio <- function(x) { + prom <- x$promise + + if (is.null(prom)) { + prom <- promises::promise(function(resolve, reject) { + assign("callback", function(...) { + + # WARNING: x$data is heavily side-effecty! + value <- x$data + + if (is_error_value(value)) { + reject(simpleError(nng_error(value))) + } else { + resolve(value) + } + }, x) + }) + + # WARNING: x$data is heavily side-effecty! + value <- x$data + + if (!inherits(value, "unresolvedValue")) { + if (is_error_value(value)) { + prom <- promises::promise_reject(simpleError(nng_error(value))) + } else { + prom <- promises::promise_resolve(value) + } + } + + # Save for next time. This is not just an optimization but essential for + # correct behavior if as.promise is called multiple times, because only one + # `callback` can exist on the recvAio object at a time. + assign("promise", prom, x) + } + + prom +} # Core aio functions ----------------------------------------------------------- diff --git a/R/nanonext-package.R b/R/nanonext-package.R index fff6f714a..d39594d39 100644 --- a/R/nanonext-package.R +++ b/R/nanonext-package.R @@ -95,6 +95,7 @@ #' @importFrom stats start #' @importFrom tools md5sum #' @importFrom utils .DollarNames +#' @importFrom later later #' @useDynLib nanonext, .registration = TRUE #' "_PACKAGE" diff --git a/src/aio.c b/src/aio.c index 97606f517..eb80a7823 100644 --- a/src/aio.c +++ b/src/aio.c @@ -20,6 +20,7 @@ #define NANONEXT_SUPPLEMENTALS #define NANONEXT_SIGNALS #include "nanonext.h" +#include "later_shim.h" // internals ------------------------------------------------------------------- @@ -195,6 +196,21 @@ static void isaio_complete(void *arg) { } + +static void raio_invoke_cb(void* arg) { + nano_aio *raio = (nano_aio *) arg; + if (raio->cb == NULL || Rf_isNull(raio->cb)) return; + SEXP func = (SEXP)raio->cb; + SEXP callExpr, result; + if (!Rf_isNull(func)) { + PROTECT(callExpr = Rf_lcons(func, R_NilValue)); // Prepare call + PROTECT(result = Rf_eval(callExpr, R_GlobalEnv)); // Execute call + + UNPROTECT(2); + R_ReleaseObject(func); + } +} + static void raio_complete(void *arg) { nano_aio *raio = (nano_aio *) arg; @@ -210,6 +226,7 @@ static void raio_complete(void *arg) { raio->result = res - !res; #endif + later2(raio_invoke_cb, arg, 0); } static void raio_complete_signal(void *arg) { @@ -229,6 +246,7 @@ static void raio_complete_signal(void *arg) { nng_cv_wake(cv); nng_mtx_unlock(mtx); + later2(raio_invoke_cb, arg, 0); } static void request_complete_signal(void *arg) { @@ -709,7 +727,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) { } SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, - const SEXP bytes, const SEXP clo, nano_cv *ncv) { + const SEXP bytes, const SEXP clo, const SEXP cb, nano_cv *ncv) { const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout); const int signal = ncv != NULL; @@ -725,6 +743,12 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, raio->next = ncv; raio->type = RECVAIO; raio->mode = mod; + if (Rf_isNull(cb)) { + raio->cb = NULL; + } else { + R_PreserveObject(cb); + raio->cb = (void*)cb; + } if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio))) goto exitlevel1; @@ -791,19 +815,19 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, } -SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { +SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { - return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, NULL); + return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, NULL); } -SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { +SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { if (R_ExternalPtrTag(cvar) != nano_CvSymbol) Rf_error("'cv' is not a valid Condition Variable"); nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar); - return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, ncv); + return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, ncv); } diff --git a/src/later_shim.cpp b/src/later_shim.cpp new file mode 100644 index 000000000..62e90e173 --- /dev/null +++ b/src/later_shim.cpp @@ -0,0 +1,5 @@ +#include + +extern "C" void later2(void (*func)(void*), void* data, double secs) { + later::later(func, data, secs); +} diff --git a/src/later_shim.h b/src/later_shim.h new file mode 100644 index 000000000..7a9345b47 --- /dev/null +++ b/src/later_shim.h @@ -0,0 +1,7 @@ +#ifndef LATER_SHIM_H +#define LATER_SHIM_H + +// This is simply a shim so that later::later can be accessed from C, not C++ +void later2(void (*func)(void*), void* data, double secs); + +#endif diff --git a/src/nanonext.h b/src/nanonext.h index 74c43e633..24a868975 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -88,6 +88,7 @@ typedef struct nano_aio_s { int result; void *data; void *next; + void *cb; } nano_aio; typedef struct nano_cv_s { @@ -250,8 +251,8 @@ SEXP rnng_protocol_open(SEXP, SEXP); SEXP rnng_random(SEXP, SEXP); SEXP rnng_reap(SEXP); SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP); -SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_send(SEXP, SEXP, SEXP, SEXP);