diff --git a/onvm/onvm_mgr/main.c b/onvm/onvm_mgr/main.c index 4c1bbd53d..6b5e73ca1 100644 --- a/onvm/onvm_mgr/main.c +++ b/onvm/onvm_mgr/main.c @@ -149,6 +149,10 @@ master_thread_main(void) { rte_atomic16_set(nf_wakeup_infos[i].shm_server, 0); sem_post(nf_wakeup_infos[i].mutex); } + if (nfs[i].pool_status.pool_sleep_state == 1) { + nfs[i].pool_status.pool_sleep_state = 0; + sem_post(nf_pool_wakeup_infos[i].mutex); + } } /* Wait to process all exits */ @@ -171,6 +175,12 @@ master_thread_main(void) { } } + /* Clean up the nf pool structs */ + for (i = 0; i < MAX_NFS; i++) { + sem_close(nf_pool_wakeup_infos[i].mutex); + sem_unlink(nf_pool_wakeup_infos[i].sem_name); + } + RTE_LOG(INFO, APP, "Core %d: Master thread done\n", rte_lcore_id()); } diff --git a/onvm/onvm_mgr/onvm_init.c b/onvm/onvm_mgr/onvm_init.c index a0d9e7527..f39f1f06e 100644 --- a/onvm/onvm_mgr/onvm_init.c +++ b/onvm/onvm_mgr/onvm_init.c @@ -56,6 +56,7 @@ struct port_info *ports = NULL; struct core_status *cores = NULL; struct onvm_configuration *onvm_config = NULL; struct nf_wakeup_info *nf_wakeup_infos = NULL; +struct nf_wakeup_info *nf_pool_wakeup_infos = NULL; struct rte_mempool *pktmbuf_pool; struct rte_mempool *nf_init_cfg_pool; @@ -86,6 +87,9 @@ init_port(uint8_t port_num); static void init_shared_sem(void); +static void +init_pool_sem(void); + static int init_info_queue(void); @@ -244,6 +248,9 @@ init(int argc, char *argv[]) { /* initialise the shared memory for shared core mode */ init_shared_sem(); + /* Init mutex's for NF pool structures */ + init_pool_sem(); + /*initialize a default service chain*/ default_chain = onvm_sc_create(); retval = onvm_sc_append_entry(default_chain, ONVM_NF_ACTION_TONF, 1); @@ -263,7 +270,6 @@ init(int argc, char *argv[]) { onvm_sc_print(default_chain); onvm_flow_dir_init(); - return 0; } @@ -446,6 +452,28 @@ init_shared_sem(void) { } } +static void +init_pool_sem(void) { + uint16_t i; + sem_t *mutex; + char * sem_name; + + nf_pool_wakeup_infos = rte_calloc("POOL_SHM_INFOS", sizeof(struct nf_wakeup_info), MAX_NFS, 0); + for (i = 0; i < MAX_NFS; i++) { + sem_name = rte_malloc(NULL, sizeof(char) * 64, 0); + snprintf(sem_name, 64, "nf_pool_%d", i); + nf_pool_wakeup_infos[i].sem_name = sem_name; + + mutex = sem_open(sem_name, O_CREAT, 06666, 0); + if (mutex == SEM_FAILED) { + fprintf(stderr, "can not create semaphore for NF %d\n", i); + sem_unlink(sem_name); + exit(1); + } + nf_pool_wakeup_infos[i].mutex = mutex; + } +} + /** * Allocate a rte_ring for newly created NFs */ diff --git a/onvm/onvm_mgr/onvm_init.h b/onvm/onvm_mgr/onvm_init.h index f133038e4..8319eedfe 100644 --- a/onvm/onvm_mgr/onvm_init.h +++ b/onvm/onvm_mgr/onvm_init.h @@ -129,6 +129,7 @@ extern uint8_t ONVM_NF_SHARE_CORES; /* For handling shared core logic */ extern struct nf_wakeup_info *nf_wakeup_infos; +extern struct nf_wakeup_info *nf_pool_wakeup_infos; /**********************************Functions**********************************/ diff --git a/onvm/onvm_mgr/onvm_nf.c b/onvm/onvm_mgr/onvm_nf.c index 674424ed0..54baf3de1 100644 --- a/onvm/onvm_mgr/onvm_nf.c +++ b/onvm/onvm_mgr/onvm_nf.c @@ -108,6 +108,16 @@ onvm_nf_relocate_nf(uint16_t nf, uint16_t new_core); static void onvm_nf_init_lpm_region(struct lpm_request *req_lpm); +/* + * Function that initializes an rte_ring + * + * Input : the address of an ring request struct + * Output : a return code based on initialization of the ring object + * + */ +static void +onvm_nf_init_single_ring(struct ring_request *ring_req); + /* * Function that initializes a hashtable for a flow_table struct * @@ -173,6 +183,8 @@ onvm_nf_check_status(void) { struct onvm_nf_init_cfg *nf_init_cfg; struct lpm_request *req_lpm; struct ft_request *ft; + struct ring_request *ring_req; + struct id_request *id_req; uint16_t stop_nf_id; int num_msgs = rte_ring_count(incoming_msg_queue); @@ -195,12 +207,21 @@ onvm_nf_check_status(void) { ft = (struct ft_request *) msg->msg_data; onvm_nf_init_ft(ft); break; + case MSG_REQUEST_RING: + ring_req = (struct ring_request *) msg->msg_data; + onvm_nf_init_single_ring(ring_req); + break; case MSG_NF_STARTING: nf_init_cfg = (struct onvm_nf_init_cfg *)msg->msg_data; if (onvm_nf_start(nf_init_cfg) == 0) { onvm_stats_gen_event_nf_info("NF Starting", &nfs[nf_init_cfg->instance_id]); } break; + case MSG_REQUEST_ID: + id_req = (struct id_request *) msg->msg_data; + id_req->instance_id = next_instance_id; + id_req->status = 0; + break; case MSG_NF_READY: nf = (struct onvm_nf *)msg->msg_data; if (onvm_nf_ready(nf) == 0) { @@ -291,6 +312,7 @@ onvm_nf_start(struct onvm_nf_init_cfg *nf_init_cfg) { return 1; } + spawned_nf->pool_status.pool_sleep_state = 0; spawned_nf->instance_id = nf_id; spawned_nf->service_id = nf_init_cfg->service_id; spawned_nf->status = NF_STARTING; @@ -454,6 +476,17 @@ onvm_nf_init_ft(struct ft_request *ft) { } } +static void +onvm_nf_init_single_ring(struct ring_request *ring_req) { + struct rte_ring *ring; + ring = rte_ring_create(ring_req->name, ring_req->count, rte_socket_id(), RING_F_SC_DEQ); + if (ring) { + ring_req->status = 0; + } else { + ring_req->status = -1; + } +} + inline int onvm_nf_relocate_nf(uint16_t dest, uint16_t new_core) { uint16_t *msg_data; diff --git a/onvm/onvm_mgr/onvm_stats.c b/onvm/onvm_mgr/onvm_stats.c index 6f0e370f8..a83e8e2ba 100644 --- a/onvm/onvm_mgr/onvm_stats.c +++ b/onvm/onvm_mgr/onvm_stats.c @@ -408,6 +408,7 @@ onvm_stats_display_nfs(unsigned difftime, uint8_t verbosity_level) { /* For same service id TX/RX stats */ uint8_t print_total_stats = 0; + uint64_t pool_count = 0; uint64_t rx_for_service[MAX_SERVICES]; uint64_t tx_for_service[MAX_SERVICES]; uint64_t rx_drop_for_service[MAX_SERVICES]; @@ -445,6 +446,10 @@ onvm_stats_display_nfs(unsigned difftime, uint8_t verbosity_level) { for (i = 0; i < MAX_NFS; i++) { if (!onvm_nf_is_valid(&nfs[i])) continue; + if (nfs[i].pool_status.pool_sleep_state == 1) { + pool_count++; + continue; + } const uint64_t rx = nfs[i].stats.rx; const uint64_t rx_drop = nfs[i].stats.rx_drop; const uint64_t tx = nfs[i].stats.tx; @@ -572,6 +577,7 @@ onvm_stats_display_nfs(unsigned difftime, uint8_t verbosity_level) { onvm_stats_display_client_wakeup_thread_context(difftime); } + fprintf(stats_out, "\n\nNumber of NF's in pool: %ld\n--------------------------\n", pool_count); } /***************************Helper functions**********************************/ diff --git a/onvm/onvm_nflib/onvm_common.h b/onvm/onvm_nflib/onvm_common.h index e2b8adcc1..f4c87f187 100755 --- a/onvm/onvm_nflib/onvm_common.h +++ b/onvm/onvm_nflib/onvm_common.h @@ -270,6 +270,7 @@ struct onvm_nf { uint16_t instance_id; uint16_t service_id; uint8_t status; + uint8_t pool_nf; char *tag; /* Pointer to NF defined state data */ void *data; @@ -324,6 +325,17 @@ struct onvm_nf { /* Mutex for NF sem_wait */ sem_t *nf_mutex; } shared_core; + + struct { + /* + * Sleep state to track state of whether the NF is active + * Same logic as shared core + */ + volatile int16_t pool_sleep_state; + sem_t *pool_mutex; + const char *pool_mutex_name; + char *binary_executable; + } pool_status; }; /* @@ -342,6 +354,14 @@ struct onvm_nf_init_cfg { uint16_t pkt_limit; }; +struct onvm_nf_pool_ctx { + struct rte_ring *pool_ring; + const char *args; + const char *binary_executable; + const char *nf_name; + unsigned refill; +}; + /* * Define a structure to describe a service chain entry */ @@ -372,6 +392,34 @@ struct ft_request { int status; }; +struct ring_request { + char *name; + unsigned count; + unsigned flags; + int status; +}; + +struct id_request { + int instance_id; + int status; +}; + +struct simple_forward_args { + const char *service_id; + const char *destination_id; + struct { + const char *print_delay; + } optional_args; +}; + +struct aes_decrypt_args { + const char *service_id; + const char *destination_id; + struct { + const char *print_delay; + } optional_args; +}; + /* define common names for structures shared between server and NF */ #define MP_NF_RXQ_NAME "MProc_Client_%u_RX" #define MP_NF_TXQ_NAME "MProc_Client_%u_TX" @@ -390,6 +438,7 @@ struct ft_request { #define _NF_MSG_QUEUE_NAME "NF_%u_MSG_QUEUE" #define _NF_MEMPOOL_NAME "NF_INFO_MEMPOOL" #define _NF_MSG_POOL_NAME "NF_MSG_MEMPOOL" +#define _NF_POOL_NAME "onvm_nf_pool" /* interrupt semaphore specific updates */ #define SHMSZ 4 // size of shared memory segement (page_size) @@ -411,6 +460,8 @@ struct ft_request { #define NF_CORE_BUSY 12 // The manually selected core is busy #define NF_WAITING_FOR_LPM 13 // NF is waiting for a LPM request to be fulfilled #define NF_WAITING_FOR_FT 14 // NF is waiting for a flow-table request to be fulfilled +#define NF_WAITING_FOR_RING 15 // NF is waiting for a ring request to be fulfilled +#define NF_WAITING_FOR_INSTANCE_ID 16 // NF is waiting for the manager to return the next instance ID #define NF_NO_ID -1 diff --git a/onvm/onvm_nflib/onvm_includes.h b/onvm/onvm_nflib/onvm_includes.h index e2c6dedb1..888ba384f 100644 --- a/onvm/onvm_nflib/onvm_includes.h +++ b/onvm/onvm_nflib/onvm_includes.h @@ -85,6 +85,7 @@ #include #include #include +#include /******************************Internal headers*******************************/ diff --git a/onvm/onvm_nflib/onvm_msg_common.h b/onvm/onvm_nflib/onvm_msg_common.h index a285401a9..e9d57ff5f 100644 --- a/onvm/onvm_nflib/onvm_msg_common.h +++ b/onvm/onvm_nflib/onvm_msg_common.h @@ -55,6 +55,8 @@ #define MSG_REQUEST_LPM_REGION 7 #define MSG_CHANGE_CORE 8 #define MSG_REQUEST_FT 9 +#define MSG_REQUEST_RING 10 +#define MSG_REQUEST_ID 11 struct onvm_nf_msg { uint8_t msg_type; /* Constant saying what type of message is */ diff --git a/onvm/onvm_nflib/onvm_nflib.c b/onvm/onvm_nflib/onvm_nflib.c index 2e5ecb33b..5d9901d28 100755 --- a/onvm/onvm_nflib/onvm_nflib.c +++ b/onvm/onvm_nflib/onvm_nflib.c @@ -70,6 +70,7 @@ #define NF_MODE_RING 2 #define ONVM_NO_CALLBACK NULL +#define TIMEOUT_NF_REQUEST 3 /******************************Global Variables*******************************/ @@ -101,6 +102,9 @@ static struct onvm_nf_local_ctx *main_nf_local_ctx; // Global NF specific signal handler static handle_signal_func global_nf_signal_handler = NULL; +// Global NF pool table +struct rte_hash *pool_map; + // Shared data for default service chain struct onvm_service_chain *default_chain; @@ -220,7 +224,6 @@ onvm_nflib_thread_main_loop(void *arg); */ static void init_shared_core_mode_info(uint16_t instance_id); - /* * Signal handler to catch SIGINT/SIGTERM. * @@ -230,6 +233,25 @@ init_shared_core_mode_info(uint16_t instance_id); void onvm_nflib_handle_signal(int signal); +/* + * Forks NF's and enqueues them into a given ring. The ring stores NF struct pointers corresponding + * to its pool. + */ +int +onvm_nflib_fork_pool_nfs(const char *nf_name, const char *nf_args, struct rte_ring *nf_pool_ring, int nf_count); + +/** + * Obtains the nfpool hashmap used for NF pool API + */ +struct rte_hash * +onvm_nflib_get_nfpool_hashmap(void); + +/** + * Function to initialize the semaphore used for pool nf's + */ +void +init_pool_info(uint16_t instance_id); + /************************************API**************************************/ struct onvm_nf_local_ctx * @@ -287,6 +309,33 @@ onvm_nflib_request_lpm(struct lpm_request *lpm_req) { return lpm_req->status; } +int +onvm_nflib_request_ring(struct ring_request *ring_req) { + struct onvm_nf_msg *request_message; + int ret; + + ret = rte_mempool_get(nf_msg_pool, (void **)(&request_message)); + if (ret != 0) + return ret; + + request_message->msg_type = MSG_REQUEST_RING; + request_message->msg_data = ring_req; + + ret = rte_ring_enqueue(mgr_msg_queue, request_message); + if (ret < 0) { + rte_mempool_put(nf_msg_pool, request_message); + return ret; + } + + ring_req->status = NF_WAITING_FOR_RING; + for (; ring_req->status == (uint16_t)NF_WAITING_FOR_RING;) { + sleep(1); + } + + rte_mempool_put(nf_msg_pool, request_message); + return ring_req->status; +} + int onvm_nflib_request_ft(struct rte_hash_parameters *ipv4_hash_params) { struct onvm_nf_msg *request_message; @@ -530,6 +579,7 @@ onvm_nflib_start_nf(struct onvm_nf_local_ctx *nf_local_ctx, struct onvm_nf_init_ RTE_LOG(INFO, APP, "Shared CPU support enabled\n"); init_shared_core_mode_info(nf->instance_id); } + init_pool_info(nf->instance_id); RTE_LOG(INFO, APP, "Using Instance ID %d\n", nf->instance_id); RTE_LOG(INFO, APP, "Using Service ID %d\n", nf->service_id); @@ -600,6 +650,10 @@ onvm_nflib_thread_main_loop(void *arg) { } } + if (unlikely(nf->pool_status.pool_sleep_state == 1)) { + sem_wait(nf->pool_status.pool_mutex); + } + nb_pkts_added = onvm_nflib_dequeue_packets((void **)pkts, nf_local_ctx, nf->function_table->pkt_handler); @@ -949,16 +1003,62 @@ onvm_nflib_lookup_shared_structs(void) { if (mgr_msg_queue == NULL) rte_exit(EXIT_FAILURE, "Cannot get mgr message ring"); + pool_map = onvm_nflib_get_nfpool_hashmap(); + if (pool_map == NULL) { + rte_exit(EXIT_FAILURE, "Could not lookup/create hashmap\n"); + } + return 0; } +struct rte_hash * +onvm_nflib_get_nfpool_hashmap(void) { + struct rte_hash_parameters *ipv4_hash_params; + struct rte_hash *map; + size_t nf_pool_name_size; + char *name; + int status; + + if ((map = rte_hash_find_existing(_NF_POOL_NAME)) != NULL) { + return map; + } + + ipv4_hash_params = (struct rte_hash_parameters *) rte_malloc(NULL, sizeof(struct rte_hash_parameters), 0); + if (!ipv4_hash_params) { + return NULL; + } + + nf_pool_name_size = strlen(_NF_POOL_NAME) + 1; + name = rte_malloc(NULL, 64, 0); + /* create ipv4 hash table. use core number and cycle counter to get a unique name. */ + ipv4_hash_params->entries = 256; + ipv4_hash_params->key_len = 64; + ipv4_hash_params->hash_func = rte_jhash; + ipv4_hash_params->hash_func_init_val = 0; + ipv4_hash_params->name = name; + ipv4_hash_params->socket_id = rte_socket_id(); + snprintf(name, nf_pool_name_size, "%s", _NF_POOL_NAME); + + status = onvm_nflib_request_ft(ipv4_hash_params); + if (status < 0) { + return NULL; + } + RTE_LOG(INFO, APP, "Looking up:%s\n", name); + map = rte_hash_find_existing(name); + if (map == NULL) { + return NULL; + } + + return map; +} + static void onvm_nflib_parse_config(struct onvm_configuration *config) { ONVM_NF_SHARE_CORES = config->flags.ONVM_NF_SHARE_CORES; } static inline uint16_t -onvm_nflib_dequeue_packets(void **pkts, struct onvm_nf_local_ctx *nf_local_ctx, nf_pkt_handler_fn handler) { +onvm_nflib_dequeue_packets(void **pkts, struct onvm_nf_local_ctx *nf_local_ctx, nf_pkt_handler_fn handler) { struct onvm_nf *nf; struct onvm_pkt_meta *meta; uint16_t i, nb_pkts; @@ -1091,7 +1191,6 @@ onvm_nflib_is_scale_info_valid(struct onvm_nf_scale_info *scale_info) { scale_info->function_table->pkt_handler != NULL; } - static void onvm_nflib_nf_tx_mgr_init(struct onvm_nf *nf) { nf->nf_tx_mgr = rte_zmalloc(NULL, sizeof(struct queue_mgr), RTE_CACHE_LINE_SIZE); @@ -1316,6 +1415,21 @@ init_shared_core_mode_info(uint16_t instance_id) { nf->shared_core.sleep_state = (rte_atomic16_t *)shm; } +void +init_pool_info(uint16_t instance_id) { + struct onvm_nf *nf; + char *sem_name; + + nf = &nfs[instance_id]; + sem_name = rte_malloc(NULL, sizeof(char) * 64, 0); + snprintf(sem_name, 64, "nf_pool_%d", instance_id); + + nf->pool_status.pool_mutex = sem_open(sem_name, 0, 0666, 0); + nf->pool_status.pool_mutex_name = sem_name; + if (nf->pool_status.pool_mutex == SEM_FAILED) + rte_exit(EXIT_FAILURE, "Unable to execute semphore for NF %d\n", instance_id); +} + void onvm_nflib_stats_summary_output(uint16_t id) { const char clr[] = {27, '[', '2', 'J', '\0'}; @@ -1411,3 +1525,239 @@ onvm_nflib_stats_summary_output(uint16_t id) { printf("CSV file written to %s directory\n", nf_tag); free(csv_filename); } + +int +onvm_nflib_pool_enqueue(const char *nf_name, const char *nf_args, int eq_num, int refill) { + struct rte_ring *nf_pool_ring; + struct ring_request *rte_ring_request; + char *nf_ring_name; + char *global_nf_name; + int spawned_nf_count, ret; + struct onvm_nf_pool_ctx *pool_ctx = NULL; + + if (nf_args == NULL) { + RTE_LOG(INFO, APP, "Invalid NF args string\n"); + return -1; + } + + if (eq_num <= 0) { + RTE_LOG(INFO, APP, "Invalid amount of NF's requested for pool\n"); + return -1; + } + + global_nf_name = rte_calloc(NULL, 1, 64, 0); + snprintf(global_nf_name, 64, "%s", nf_name); + ret = rte_hash_lookup_data(pool_map, global_nf_name, (void *) &pool_ctx); + if (ret < 0) { + pool_ctx = rte_malloc(NULL, sizeof(struct onvm_nf_pool_ctx), 0); + rte_ring_request = rte_malloc(NULL, sizeof(struct ring_request), 0); + nf_ring_name = rte_malloc(NULL, sizeof(char) * 64, 0); + if (rte_ring_request == NULL || nf_ring_name == NULL || pool_ctx == NULL) { + RTE_LOG(INFO, APP, "Could not allocate ring request objects\n"); + return -1; + } + rte_ring_request->count = 128; + snprintf(nf_ring_name, 64, "%s", nf_name); + rte_ring_request->name = nf_ring_name; + + if (onvm_nflib_request_ring(rte_ring_request) == -1) { + RTE_LOG(INFO, APP, "Could not request a ring for %s from manager\n", nf_name); + return -1; + } + + RTE_LOG(INFO, APP, "Created %s nf pool ring\n", nf_name); + nf_pool_ring = rte_ring_lookup(nf_name); + if (nf_pool_ring == NULL) { + RTE_LOG(INFO, APP, "Could not lookup %s ring\n", nf_name); + return -1; + } + pool_ctx->pool_ring = nf_pool_ring; + pool_ctx->args = nf_args; + pool_ctx->nf_name = nf_name; + if (rte_hash_add_key_data(pool_map, global_nf_name, (void *) pool_ctx) != 0) { + RTE_LOG(INFO, APP, "Could not add to hash table\n"); + return -1; + } + + rte_free(rte_ring_request); + rte_free(nf_ring_name); + } + + if (refill >= 0) { + pool_ctx->refill = refill; + } + + spawned_nf_count = onvm_nflib_fork_pool_nfs(pool_ctx->nf_name, pool_ctx->args, pool_ctx->pool_ring, eq_num); + return spawned_nf_count; +} + +int +onvm_nflib_fork_pool_nfs(const char *nf_name, const char *nf_args, struct rte_ring *nf_pool_ring, int nf_count) { + struct onvm_nf *spawned_nf; + int i, count_sleep; + + for (i = 0; i < nf_count; i++) { + spawned_nf = onvm_nflib_fork(nf_name, nf_args); + count_sleep = 0; + while (spawned_nf->status != NF_RUNNING) { + if (count_sleep == TIMEOUT_NF_REQUEST) { + RTE_LOG(INFO, APP, "nf %d of requested %d nf request timed out\n", i, nf_count); + return i; + } + count_sleep++; + sleep(1); + } + if (rte_ring_enqueue(nf_pool_ring, (void *) spawned_nf) != 0) { + RTE_LOG(INFO, APP, "Failed to enqueue spawned NF into the ring\n"); + break; + } + spawned_nf->pool_status.pool_sleep_state = 1; + } + + return i; +} + +int +onvm_nflib_pool_dequeue(const char *nf_name, int dq_num, int refill) { + struct rte_ring *nf_pool_ring; + struct onvm_nf_pool_ctx *pool_ctx; + struct onvm_nf *dequeued_nf = NULL; + sem_t *sem; + int i, ret, total_refill, spawned_nf_count; + char *global_nf_name; + + global_nf_name = rte_calloc(NULL, 1, 64, 0); + snprintf(global_nf_name, 64, "%s", nf_name); + ret = rte_hash_lookup_data(pool_map, global_nf_name, (void *) &pool_ctx); + if (ret < 0) { + RTE_LOG(INFO, APP, "NF does not have an associated pool\n"); + return -1; + } + + nf_pool_ring = pool_ctx->pool_ring; + for (i = 0; i < dq_num; i++) { + if (rte_ring_dequeue(nf_pool_ring, (void *) &dequeued_nf) != 0) { + RTE_LOG(INFO, APP, "Could not dequeue %s, ring is empty\n", nf_name); + break; + } + sem = sem_open(dequeued_nf->pool_status.pool_mutex_name, 0, 0666, 0); + dequeued_nf->pool_status.pool_sleep_state = 0; + if (sem_post(sem) < 0) { + RTE_LOG(INFO, APP, "Could not post to semaphore\n"); + break; + } + } + if (refill >= 0) { + pool_ctx->refill = refill; + } + + if (rte_ring_count(nf_pool_ring) < pool_ctx->refill) { + total_refill = pool_ctx->refill - rte_ring_count(nf_pool_ring); + spawned_nf_count = onvm_nflib_fork_pool_nfs(pool_ctx->nf_name, pool_ctx->args, + nf_pool_ring, total_refill); + RTE_LOG(INFO, APP, "Refilled NF pool with %d NF's\n", spawned_nf_count); + } + + return i; +} + +struct onvm_nf * +onvm_nflib_fork(const char *nf_name, const char *nf_args) { + char *go_script_path; + int instance_id; + + if (nf_name == NULL) { + RTE_LOG(INFO, APP, "onvm_nflib_fork(): nf_name is null\n"); + return NULL; + } + if (nf_args == NULL) { + RTE_LOG(INFO, APP, "onvm_nflib_fork(): nf_args is null\n"); + return NULL; + } + + instance_id = onvm_nflib_request_next_instance_id(); + go_script_path = onvm_nflib_get_go_script_path(); + if (go_script_path == NULL) { + return NULL; + } + if (fork() == 0) { + char *command; + int ret; + ret = asprintf(&command, "%s %s %s", go_script_path, nf_name, nf_args); + if (ret < 0) { + RTE_LOG(INFO, APP, "Could not allocate command string for bash script call\n"); + return NULL; + } + ret = system(command); + if (ret < 0) { + RTE_LOG(INFO, APP, "Could not execute NF go script\n"); + return NULL; + } + } + + return &nfs[instance_id]; +} + +char * +onvm_nflib_get_go_script_path(void) { + char *token; + const char *examples_path = "/examples/"; + const char *go_script = "start_nf.sh"; + char *go_script_path; + uint16_t total_directory_size, wd_nf_len, token_len, cwd_len, go_script_len; + char *cwd = rte_malloc(0, sizeof(char) * 4096, 0); + + if (getcwd(cwd, 4096) == NULL) { + RTE_LOG(INFO, APP, "Could not extract current working CWD"); + return NULL; + } + + token = strstr(cwd, examples_path); + *token = '\0'; + token_len = strlen(token); + cwd = cwd + token_len; + + cwd_len = strlen(cwd); + go_script_len = strlen(go_script); + wd_nf_len = strlen(examples_path); + total_directory_size = cwd_len + go_script_len; + go_script_path = rte_malloc(0, sizeof(char) * total_directory_size, 0); + + strncat(go_script_path, cwd, cwd_len); + strncat(go_script_path, examples_path, wd_nf_len); + strncat(go_script_path, go_script, go_script_len); + + rte_free(cwd); + return go_script_path; +} + +int +onvm_nflib_request_next_instance_id(void) { + struct id_request *get_next_id; + struct onvm_nf_msg *request_message; + int ret, instance_id; + + ret = rte_mempool_get(nf_msg_pool, (void **)(&request_message)); + if (ret != 0) + return ret; + + get_next_id = rte_malloc(NULL, sizeof(struct id_request), 0); + request_message->msg_type = MSG_REQUEST_ID; + request_message->msg_data = get_next_id; + + ret = rte_ring_enqueue(mgr_msg_queue, request_message); + if (ret < 0) { + rte_mempool_put(nf_msg_pool, request_message); + return ret; + } + + get_next_id->status = NF_WAITING_FOR_INSTANCE_ID; + for (; get_next_id->status == (uint16_t)NF_WAITING_FOR_INSTANCE_ID;) { + sleep(1); + } + instance_id = get_next_id->instance_id; + + rte_mempool_put(nf_msg_pool, request_message); + rte_free(get_next_id); + return instance_id; +} diff --git a/onvm/onvm_nflib/onvm_nflib.h b/onvm/onvm_nflib/onvm_nflib.h index 858f95e45..178210d24 100644 --- a/onvm/onvm_nflib/onvm_nflib.h +++ b/onvm/onvm_nflib/onvm_nflib.h @@ -270,6 +270,20 @@ onvm_nflib_scale(struct onvm_nf_scale_info *scale_info); int onvm_nflib_request_lpm(struct lpm_request *req); +/** + * Request ring data structure. Return success or failure of this initiatlization + * @param ring_request + * @return response status + */ +int +onvm_nflib_request_ring(struct ring_request *ring_req); + +/** + * Requests the next available instance ID from the manager. Not MT safe. + */ +int +onvm_nflib_request_next_instance_id(void); + /* * Initializes a flow_tables hashmap. Returns the status code, representing the success or failure of the initialization * @@ -297,4 +311,37 @@ onvm_nflib_get_onvm_config(void); void onvm_nflib_stats_summary_output(uint16_t id); +/** + * Enqueues an NF into its corresponding pool. + * Input: Name of the nf, args to the nf, number of nf's to enqueue, (optional) pool refll threshold + * Output: Number of nf's enqueued into pool, -1 on error + */ +int +onvm_nflib_pool_enqueue(const char *nf_name, const char *nf_args, int nf_count, int refill); + +/** + * Dequeues NF from its corresponding pool + * Input: Name of the nf, amount of nf's to dequeue, (optional) pool refill threshold + * Output: Number of nf's dequeued from pool, -1 on error + */ +int +onvm_nflib_pool_dequeue(const char *nf_name, int nf_count, int refill_threshold); + +/** + * Forks a NF based off its name. Assumes the NF is within the examples directory. + * Input: Name of the nf, argument struct to the NF + * Output: Pointer to NF struct that was forked + */ +struct onvm_nf * +onvm_nflib_fork(const char *nf_name, const char *nf_args); + +/** + * Creates a path to the start_nf.sh script from the calling process, which is used + * to dynamically start NF's during runtime + * Input: None + * Output: String to binary executable + */ +char * +onvm_nflib_get_go_script_path(void); + #endif // _ONVM_NFLIB_H_