Skip to content

Commit 70d3972

Browse files
committed
Add support for distributed links
Also fix external_from_binary to avoid GC-caused corruptions and simplify API Signed-off-by: Paul Guyot <[email protected]>
1 parent ad26b6f commit 70d3972

13 files changed

+821
-249
lines changed

src/libAtomVM/context.c

+138-27
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
#define DEFAULT_STACK_SIZE 8
5353
#define BYTES_PER_TERM (TERM_BITS / 8)
5454

55-
static struct ResourceContextMonitor *context_monitors_handle_terminate(Context *ctx);
55+
static struct Monitor *context_monitors_handle_terminate(Context *ctx);
5656
static void context_distribution_handle_terminate(Context *ctx);
5757
static void destroy_extended_registers(Context *ctx, unsigned int live);
5858

@@ -174,6 +174,22 @@ void context_destroy(Context *ctx)
174174
context_unlink_ack(ctx, immediate_ref_signal->immediate, immediate_ref_signal->ref_ticks);
175175
break;
176176
}
177+
case UnlinkRemoteIDSignal: {
178+
struct TermSignal *term_signal
179+
= CONTAINER_OF(signal_message, struct TermSignal, base);
180+
uint64_t unlink_id = term_maybe_unbox_int64(term_get_tuple_element(term_signal->signal_term, 0));
181+
term remote_pid = term_get_tuple_element(term_signal->signal_term, 1);
182+
context_ack_unlink(ctx, remote_pid, unlink_id, true);
183+
break;
184+
}
185+
case UnlinkRemoteIDAckSignal: {
186+
struct TermSignal *term_signal
187+
= CONTAINER_OF(signal_message, struct TermSignal, base);
188+
uint64_t unlink_id = term_maybe_unbox_int64(term_get_tuple_element(term_signal->signal_term, 0));
189+
term remote_pid = term_get_tuple_element(term_signal->signal_term, 1);
190+
context_unlink_ack(ctx, remote_pid, unlink_id);
191+
break;
192+
}
177193
case DemonitorSignal: {
178194
struct RefSignal *ref_signal
179195
= CONTAINER_OF(signal_message, struct RefSignal, base);
@@ -200,24 +216,44 @@ void context_destroy(Context *ctx)
200216

201217
// When monitor message is sent, process is no longer in the table
202218
// and is no longer registered either.
203-
struct ResourceContextMonitor *resource_monitors = context_monitors_handle_terminate(ctx);
219+
struct Monitor *remaining_monitors = context_monitors_handle_terminate(ctx);
204220

205221
synclist_unlock(&ctx->global->processes_table);
206222

207-
// Eventually call resource monitors handlers after the processes table was unlocked
223+
// Eventually call distribution and resource monitors handlers after the processes table was unlocked
208224
// The monitors were removed from the list of monitors.
209-
if (resource_monitors) {
225+
if (remaining_monitors) {
210226
struct ListHead monitors;
211-
list_prepend(&resource_monitors->monitor.monitor_list_head, &monitors);
227+
list_prepend(&remaining_monitors->monitor_list_head, &monitors);
212228

213229
struct ListHead *item;
214230
struct ListHead *tmp;
215231
MUTABLE_LIST_FOR_EACH (item, tmp, &monitors) {
216232
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
217-
struct ResourceContextMonitor *resource_monitor = CONTAINER_OF(monitor, struct ResourceContextMonitor, monitor);
218-
struct RefcBinary *refc = refc_binary_from_data(resource_monitor->resource_obj);
219-
resource_type_fire_monitor(refc->resource_type, erl_nif_env_from_context(ctx), resource_monitor->resource_obj, ctx->process_id, resource_monitor->ref_ticks);
220-
free(monitor);
233+
switch (monitor->monitor_type) {
234+
case CONTEXT_MONITOR_RESOURCE: {
235+
struct ResourceContextMonitor *resource_monitor = CONTAINER_OF(monitor, struct ResourceContextMonitor, monitor);
236+
struct RefcBinary *refc = refc_binary_from_data(resource_monitor->resource_obj);
237+
resource_type_fire_monitor(refc->resource_type, erl_nif_env_from_context(ctx), resource_monitor->resource_obj, ctx->process_id, resource_monitor->ref_ticks);
238+
free(monitor);
239+
break;
240+
}
241+
case CONTEXT_MONITOR_LINK_REMOTE: {
242+
struct LinkRemoteMonitor *link_monitor = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
243+
// Handle the case of inactive link.
244+
if (link_monitor->unlink_id != UNLINK_ID_LINK_ACTIVE) {
245+
free(monitor);
246+
continue;
247+
}
248+
dist_send_payload_exit(link_monitor, ctx->exit_reason, ctx);
249+
free(monitor);
250+
break;
251+
}
252+
case CONTEXT_MONITOR_LINK_LOCAL:
253+
case CONTEXT_MONITOR_MONITORED_LOCAL:
254+
case CONTEXT_MONITOR_MONITORING_LOCAL:
255+
UNREACHABLE();
256+
}
221257
}
222258
}
223259

@@ -430,17 +466,21 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
430466
break;
431467
case LINKS_ATOM: {
432468
struct ListHead *item;
433-
size_t links_count = 0;
469+
ret_size = TUPLE_SIZE(2);
434470
LIST_FOR_EACH (item, &ctx->monitors_head) {
435471
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
436472
if (monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
437473
struct LinkLocalMonitor *link = CONTAINER_OF(monitor, struct LinkLocalMonitor, monitor);
438474
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
439-
links_count++;
475+
ret_size += CONS_SIZE;
476+
}
477+
} else if (monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
478+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
479+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
480+
ret_size += CONS_SIZE + EXTERNAL_PID_SIZE;
440481
}
441482
}
442483
}
443-
ret_size = TUPLE_SIZE(2) + CONS_SIZE * links_count;
444484
break;
445485
}
446486
default:
@@ -522,6 +562,12 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
522562
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
523563
list = term_list_prepend(link->link_local_process_id, list, heap);
524564
}
565+
} else if (monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
566+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
567+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
568+
term external_pid = term_make_external_process_id(link->node, link->pid_number, link->pid_serial, link->creation, heap);
569+
list = term_list_prepend(external_pid, list, heap);
570+
}
525571
}
526572
}
527573
term_put_tuple_element(ret, 1, list);
@@ -535,25 +581,24 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
535581
return true;
536582
}
537583

538-
static struct ResourceContextMonitor *context_monitors_handle_terminate(Context *ctx)
584+
static struct Monitor *context_monitors_handle_terminate(Context *ctx)
539585
{
540586
GlobalContext *glb = ctx->global;
541587
struct ListHead *item;
542588
struct ListHead *tmp;
543-
struct ResourceContextMonitor *result = NULL;
589+
struct Monitor *result = NULL;
544590
MUTABLE_LIST_FOR_EACH (item, tmp, &ctx->monitors_head) {
545591
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
546592
switch (monitor->monitor_type) {
547593
case CONTEXT_MONITOR_RESOURCE: {
548594
// monitor with resource
549595
// remove it from the list we are iterating on and
550596
// add it to the list of resource monitors to handle afterwards
551-
struct ResourceContextMonitor *resource_monitor = CONTAINER_OF(monitor, struct ResourceContextMonitor, monitor);
552597
if (result == NULL) {
553598
list_init(&monitor->monitor_list_head);
554-
result = resource_monitor;
599+
result = monitor;
555600
} else {
556-
list_append(&result->monitor.monitor_list_head, &monitor->monitor_list_head);
601+
list_append(&result->monitor_list_head, &monitor->monitor_list_head);
557602
}
558603
break;
559604
}
@@ -600,6 +645,16 @@ static struct ResourceContextMonitor *context_monitors_handle_terminate(Context
600645
free(monitor);
601646
break;
602647
}
648+
case CONTEXT_MONITOR_LINK_REMOTE: {
649+
// Process it afterwards
650+
if (result == NULL) {
651+
list_init(&monitor->monitor_list_head);
652+
result = monitor;
653+
} else {
654+
list_append(&result->monitor_list_head, &monitor->monitor_list_head);
655+
}
656+
break;
657+
}
603658
case CONTEXT_MONITOR_MONITORED_LOCAL: {
604659
struct MonitorLocalMonitor *monitored_monitor = CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor);
605660
int32_t local_process_id = term_to_local_process_id(monitored_monitor->monitor_obj);
@@ -647,15 +702,28 @@ static void context_distribution_handle_terminate(Context *ctx)
647702

648703
struct Monitor *monitor_link_new(term link_pid)
649704
{
650-
struct LinkLocalMonitor *monitor = malloc(sizeof(struct LinkLocalMonitor));
651-
if (IS_NULL_PTR(monitor)) {
652-
return NULL;
705+
if (term_is_local_pid_or_port(link_pid)) {
706+
struct LinkLocalMonitor *monitor = malloc(sizeof(struct LinkLocalMonitor));
707+
if (IS_NULL_PTR(monitor)) {
708+
return NULL;
709+
}
710+
monitor->monitor.monitor_type = CONTEXT_MONITOR_LINK_LOCAL;
711+
monitor->unlink_id = UNLINK_ID_LINK_ACTIVE;
712+
monitor->link_local_process_id = link_pid;
713+
return &monitor->monitor;
714+
} else {
715+
struct LinkRemoteMonitor *monitor = malloc(sizeof(struct LinkRemoteMonitor));
716+
if (IS_NULL_PTR(monitor)) {
717+
return NULL;
718+
}
719+
monitor->monitor.monitor_type = CONTEXT_MONITOR_LINK_REMOTE;
720+
monitor->unlink_id = UNLINK_ID_LINK_ACTIVE;
721+
monitor->node = term_get_external_node(link_pid);
722+
monitor->pid_number = term_get_external_pid_process_id(link_pid);
723+
monitor->pid_serial = term_get_external_pid_serial(link_pid);
724+
monitor->creation = term_get_external_node_creation(link_pid);
725+
return &monitor->monitor;
653726
}
654-
monitor->monitor.monitor_type = CONTEXT_MONITOR_LINK_LOCAL;
655-
monitor->unlink_id = UNLINK_ID_LINK_ACTIVE;
656-
monitor->link_local_process_id = link_pid;
657-
658-
return &monitor->monitor;
659727
}
660728

661729
struct Monitor *monitor_new(term monitor_pid, uint64_t ref_ticks, bool is_monitoring)
@@ -723,6 +791,18 @@ bool context_add_monitor(Context *ctx, struct Monitor *new_monitor)
723791
}
724792
break;
725793
}
794+
case CONTEXT_MONITOR_LINK_REMOTE: {
795+
struct LinkRemoteMonitor *new_link_monitor = CONTAINER_OF(new_monitor, struct LinkRemoteMonitor, monitor);
796+
struct LinkRemoteMonitor *existing_link_monitor = CONTAINER_OF(existing, struct LinkRemoteMonitor, monitor);
797+
if (UNLIKELY(existing_link_monitor->node == new_link_monitor->node
798+
&& existing_link_monitor->pid_number == new_link_monitor->pid_number
799+
&& existing_link_monitor->pid_serial == new_link_monitor->pid_serial
800+
&& existing_link_monitor->creation == new_link_monitor->creation)) {
801+
free(new_monitor);
802+
return false;
803+
}
804+
break;
805+
}
726806
}
727807
}
728808
}
@@ -735,7 +815,7 @@ bool context_set_unlink_id(Context *ctx, term link_pid, uint64_t *unlink_id)
735815
struct ListHead *item;
736816
LIST_FOR_EACH (item, &ctx->monitors_head) {
737817
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
738-
if (monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
818+
if (term_is_local_pid_or_port(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
739819
struct LinkLocalMonitor *link = CONTAINER_OF(monitor, struct LinkLocalMonitor, monitor);
740820
if (link->link_local_process_id == link_pid) {
741821
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
@@ -747,6 +827,21 @@ bool context_set_unlink_id(Context *ctx, term link_pid, uint64_t *unlink_id)
747827
return false;
748828
}
749829
}
830+
} else if (term_is_external_pid(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
831+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
832+
if (link->node == term_get_external_node(link_pid)
833+
&& link->pid_number == term_get_external_pid_process_id(link_pid)
834+
&& link->pid_serial == term_get_external_pid_serial(link_pid)
835+
&& link->creation == term_get_external_node_creation(link_pid)) {
836+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
837+
uint64_t new_id = globalcontext_get_ref_ticks(ctx->global);
838+
link->unlink_id = new_id;
839+
*unlink_id = new_id;
840+
return true;
841+
} else {
842+
return false;
843+
}
844+
}
750845
}
751846
}
752847
return false;
@@ -757,7 +852,7 @@ void context_ack_unlink(Context *ctx, term link_pid, uint64_t unlink_id, bool pr
757852
struct ListHead *item;
758853
LIST_FOR_EACH (item, &ctx->monitors_head) {
759854
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
760-
if (monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
855+
if (term_is_local_pid_or_port(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
761856
struct LinkLocalMonitor *link = CONTAINER_OF(monitor, struct LinkLocalMonitor, monitor);
762857
if (link->link_local_process_id == link_pid) {
763858
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
@@ -786,6 +881,20 @@ void context_ack_unlink(Context *ctx, term link_pid, uint64_t unlink_id, bool pr
786881
}
787882
return;
788883
}
884+
} else if (term_is_external_pid(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
885+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
886+
if (link->node == term_get_external_node(link_pid)
887+
&& link->pid_number == term_get_external_pid_process_id(link_pid)
888+
&& link->pid_serial == term_get_external_pid_serial(link_pid)
889+
&& link->creation == term_get_external_node_creation(link_pid)) {
890+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
891+
// Send ack and remove link
892+
dist_send_unlink_id_ack(unlink_id, term_from_local_process_id(ctx->process_id), link_pid, ctx);
893+
list_remove(&monitor->monitor_list_head);
894+
free(monitor);
895+
}
896+
return;
897+
}
789898
}
790899
}
791900
}
@@ -834,6 +943,7 @@ void context_demonitor(Context *ctx, uint64_t ref_ticks)
834943
}
835944
}
836945
case CONTEXT_MONITOR_LINK_LOCAL:
946+
case CONTEXT_MONITOR_LINK_REMOTE:
837947
break;
838948
}
839949
}
@@ -855,6 +965,7 @@ term context_get_monitor_pid(Context *ctx, uint64_t ref_ticks, bool *is_monitori
855965
break;
856966
}
857967
case CONTEXT_MONITOR_LINK_LOCAL:
968+
case CONTEXT_MONITOR_LINK_REMOTE:
858969
case CONTEXT_MONITOR_RESOURCE:
859970
break;
860971
}

src/libAtomVM/context.h

+13-2
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ enum ContextMonitorType
156156
CONTEXT_MONITOR_MONITORING_LOCAL,
157157
CONTEXT_MONITOR_MONITORED_LOCAL,
158158
CONTEXT_MONITOR_RESOURCE,
159+
CONTEXT_MONITOR_LINK_REMOTE,
159160
};
160161

161162
#define UNLINK_ID_LINK_ACTIVE 0x0
@@ -191,6 +192,16 @@ struct ResourceContextMonitor
191192
void *resource_obj;
192193
};
193194

195+
struct LinkRemoteMonitor
196+
{
197+
struct Monitor monitor;
198+
uint64_t unlink_id;
199+
term node;
200+
uint32_t pid_number;
201+
uint32_t pid_serial;
202+
uint32_t creation;
203+
};
204+
194205
struct ExtendedRegister
195206
{
196207
struct ListHead head;
@@ -456,10 +467,10 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
456467
/**
457468
* @brief Half-link process to another process
458469
*
459-
* @param monitor_pid process to link to
470+
* @param link_pid process to link to (local or remote)
460471
* @return the allocated monitor or NULL if allocation failed
461472
*/
462-
struct Monitor *monitor_link_new(term monitor_pid);
473+
struct Monitor *monitor_link_new(term link_pid);
463474

464475
/**
465476
* @brief Create a monitor on a process.

0 commit comments

Comments
 (0)