Skip to content

Commit ece9a73

Browse files
authored
Add scheduled processing for inbox activities (#2376)
1 parent e483fc8 commit ece9a73

File tree

10 files changed

+559
-544
lines changed

10 files changed

+559
-544
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Significance: minor
2+
Type: changed
3+
4+
Improved inbox performance by batching and deduplicating activities, reducing redundant processing and improving handling during high activity periods.

includes/class-handler.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
use Activitypub\Handler\Create;
1414
use Activitypub\Handler\Delete;
1515
use Activitypub\Handler\Follow;
16-
use Activitypub\Handler\Inbox;
1716
use Activitypub\Handler\Like;
1817
use Activitypub\Handler\Move;
1918
use Activitypub\Handler\Quote_Request;
@@ -42,7 +41,6 @@ public static function register_handlers() {
4241
Create::init();
4342
Delete::init();
4443
Follow::init();
45-
Inbox::init();
4644
Like::init();
4745
Move::init();
4846
Quote_Request::init();

includes/class-scheduler.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public static function init() {
6161
\add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) );
6262
\add_action( 'activitypub_outbox_purge', array( self::class, 'purge_outbox' ) );
6363
\add_action( 'activitypub_inbox_purge', array( self::class, 'purge_inbox' ) );
64+
\add_action( 'activitypub_inbox_create_item', array( self::class, 'process_inbox_activity' ) );
6465

6566
\add_action( 'post_activitypub_add_to_outbox', array( self::class, 'schedule_outbox_activity_for_federation' ) );
6667
\add_action( 'post_activitypub_add_to_outbox', array( self::class, 'schedule_announce_activity' ), 10, 4 );
@@ -375,6 +376,53 @@ public static function purge_inbox() {
375376
}
376377
}
377378

379+
/**
380+
* Process cached inbox activity.
381+
*
382+
* Retrieves all collected user IDs for an activity and processes them together.
383+
*
384+
* @param string $activity_id The activity ID.
385+
*/
386+
public static function process_inbox_activity( $activity_id ) {
387+
// Deduplicate if multiple inbox items were created due to race condition.
388+
$inbox_item = Inbox::deduplicate( $activity_id );
389+
if ( ! $inbox_item ) {
390+
return;
391+
}
392+
393+
$data = \json_decode( $inbox_item->post_content, true );
394+
// Reconstruct activity from inbox post.
395+
$activity = Activity::init_from_array( $data );
396+
$type = \Activitypub\camel_to_snake_case( $activity->get_type() );
397+
$context = Inbox::CONTEXT_INBOX;
398+
$user_ids = Inbox::get_recipients( $inbox_item->ID );
399+
400+
/**
401+
* Fires after any ActivityPub Inbox activity has been handled, regardless of activity type.
402+
*
403+
* This hook is triggered for all activity types processed by the inbox handler.
404+
*
405+
* @param array $data The data array.
406+
* @param array $user_ids The user IDs.
407+
* @param string $type The type of the activity.
408+
* @param Activity $activity The Activity object.
409+
* @param int $result The ID of the inbox item that was created, or WP_Error if failed.
410+
* @param string $context The context of the request ('inbox' or 'shared_inbox').
411+
*/
412+
\do_action( 'activitypub_handled_inbox', $data, $user_ids, $type, $activity, $inbox_item->ID, $context );
413+
414+
/**
415+
* Fires after an ActivityPub Inbox activity has been handled.
416+
*
417+
* @param array $data The data array.
418+
* @param array $user_ids The user IDs.
419+
* @param Activity $activity The Activity object.
420+
* @param int $result The ID of the inbox item that was created, or WP_Error if failed.
421+
* @param string $context The context of the request ('inbox' or 'shared_inbox').
422+
*/
423+
\do_action( 'activitypub_handled_inbox_' . $type, $data, $user_ids, $activity, $inbox_item->ID, $context );
424+
}
425+
378426
/**
379427
* Update schedules when outbox purge days settings change.
380428
*

includes/collection/class-inbox.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,18 @@ public static function remove_recipient( $post_id, $user_id ) {
330330
return \delete_post_meta( $post_id, '_activitypub_user_id', $user_id );
331331
}
332332

333+
/**
334+
* Add multiple recipients to an existing inbox activity.
335+
*
336+
* @param int $post_id The inbox post ID.
337+
* @param int[] $user_ids The user ID or array of user IDs to add.
338+
*/
339+
public static function add_recipients( $post_id, $user_ids ) {
340+
foreach ( $user_ids as $user_id ) {
341+
self::add_recipient( $post_id, $user_id );
342+
}
343+
}
344+
333345
/**
334346
* Get an inbox item by GUID for a specific recipient.
335347
*
@@ -358,4 +370,40 @@ public static function get_by_guid_and_recipient( $guid, $user_id ) {
358370

359371
return $post;
360372
}
373+
374+
/**
375+
* Deduplicate inbox items with the same GUID.
376+
*
377+
* If multiple inbox items exist with the same GUID (due to race conditions),
378+
* this merges all recipients into the first post and deletes duplicates.
379+
*
380+
* @param string $guid The activity GUID.
381+
*
382+
* @return \WP_Post|false The primary inbox post, or false if no posts found.
383+
*/
384+
public static function deduplicate( $guid ) {
385+
$duplicates = \get_posts(
386+
array(
387+
'post_type' => self::POST_TYPE,
388+
'guid' => $guid,
389+
'posts_per_page' => -1,
390+
'post_status' => 'any',
391+
)
392+
);
393+
394+
if ( empty( $duplicates ) ) {
395+
return false;
396+
}
397+
398+
// Keep the first post, all others are duplicates.
399+
$primary = array_shift( $duplicates );
400+
401+
foreach ( $duplicates as $duplicate ) {
402+
$recipients = \get_post_meta( $duplicate->ID, '_activitypub_user_id', false );
403+
self::add_recipients( $primary->ID, $recipients );
404+
\wp_delete_post( $duplicate->ID, true );
405+
}
406+
407+
return $primary;
408+
}
361409
}

includes/rest/class-actors-inbox-controller.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use function Activitypub\camel_to_snake_case;
1515
use function Activitypub\get_masked_wp_version;
1616
use function Activitypub\get_rest_url_by_path;
17+
use function Activitypub\object_to_uri;
1718

1819
/**
1920
* Actors_Inbox_Controller class.
@@ -197,6 +198,28 @@ public function create_item( $request ) {
197198
* @param string $context The context of the request.
198199
*/
199200
\do_action( 'activitypub_inbox_' . $type, $data, $user_id, $activity, Inbox::CONTEXT_INBOX );
201+
202+
/**
203+
* Filter to skip inbox storage.
204+
*
205+
* Skip inbox storage for debugging purposes or to reduce load for
206+
* certain Activity-Types, like "Delete".
207+
*
208+
* @param bool $skip Whether to skip inbox storage.
209+
* @param array $data The activity data array.
210+
*
211+
* @return bool Whether to skip inbox storage.
212+
*/
213+
$skip = \apply_filters( 'activitypub_skip_inbox_storage', false, $data );
214+
215+
if ( ! $skip ) {
216+
$activity_id = object_to_uri( $data );
217+
218+
Inbox::add( $activity, (array) $user_id );
219+
220+
\wp_clear_scheduled_hook( 'activitypub_inbox_create_item', array( $activity_id ) );
221+
\wp_schedule_single_event( time() + 15, 'activitypub_inbox_create_item', array( $activity_id ) );
222+
}
200223
}
201224

202225
$response = \rest_ensure_response(

includes/rest/class-inbox-controller.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,46 @@ public function create_item( $request ) {
230230
* @param string $context The context of the request.
231231
*/
232232
\do_action( 'activitypub_inbox_shared_' . $type, $data, $allowed_recipients, $activity, Inbox::CONTEXT_SHARED_INBOX );
233+
234+
/**
235+
* Filter to skip inbox storage.
236+
*
237+
* Skip inbox storage for debugging purposes or to reduce load for
238+
* certain Activity-Types, like "Delete".
239+
*
240+
* @param bool $skip Whether to skip inbox storage.
241+
* @param array $data The activity data array.
242+
*
243+
* @return bool Whether to skip inbox storage.
244+
*/
245+
$skip = \apply_filters( 'activitypub_skip_inbox_storage', false, $data );
246+
247+
if ( ! $skip ) {
248+
$result = Inbox::add( $activity, $allowed_recipients );
249+
250+
/**
251+
* Fires after an ActivityPub Inbox activity has been handled.
252+
*
253+
* @param array $data The data array.
254+
* @param array $user_ids The user IDs.
255+
* @param string $type The type of the activity.
256+
* @param Activity|\WP_Error $activity The Activity object.
257+
* @param \WP_Error|int $result The ID of the inbox item that was created, or WP_Error if failed.
258+
* @param string $context The context of the request ('inbox' or 'shared_inbox').
259+
*/
260+
\do_action( 'activitypub_handled_inbox', $data, $allowed_recipients, $type, $activity, $result, Inbox::CONTEXT_SHARED_INBOX );
261+
262+
/**
263+
* Fires after an ActivityPub Inbox activity has been handled.
264+
*
265+
* @param array $data The data array.
266+
* @param array $user_ids The user IDs.
267+
* @param Activity|\WP_Error $activity The Activity object.
268+
* @param \WP_Error|int $result The ID of the inbox item that was created, or WP_Error if failed.
269+
* @param string $context The context of the request ('inbox' or 'shared_inbox').
270+
*/
271+
\do_action( 'activitypub_handled_inbox_' . $type, $data, $allowed_recipients, $activity, $result, Inbox::CONTEXT_SHARED_INBOX );
272+
}
233273
}
234274

235275
$response = \rest_ensure_response(

tests/phpunit/tests/includes/collection/class-test-inbox.php

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,4 +579,148 @@ public function test_add_activity_with_duplicate_recipients() {
579579
$this->assertContains( 2, $recipients );
580580
$this->assertContains( 3, $recipients );
581581
}
582+
583+
/**
584+
* Test add_recipients function.
585+
*
586+
* @covers ::add_recipients
587+
*/
588+
public function test_add_recipients() {
589+
$activity = new Activity();
590+
$activity->set_id( 'https://remote.example.com/activities/add-recipients' );
591+
$activity->set_type( 'Create' );
592+
$activity->set_actor( 'https://remote.example.com/users/testuser' );
593+
594+
$object = new Base_Object();
595+
$object->set_id( 'https://remote.example.com/objects/add-recipients' );
596+
$object->set_type( 'Note' );
597+
$activity->set_object( $object );
598+
599+
$inbox_id = Inbox::add( $activity, 1 );
600+
601+
// Add multiple recipients at once.
602+
Inbox::add_recipients( $inbox_id, array( 2, 3, 4 ) );
603+
604+
// Verify all recipients were added.
605+
$recipients = Inbox::get_recipients( $inbox_id );
606+
$this->assertCount( 4, $recipients );
607+
$this->assertContains( 1, $recipients );
608+
$this->assertContains( 2, $recipients );
609+
$this->assertContains( 3, $recipients );
610+
$this->assertContains( 4, $recipients );
611+
}
612+
613+
/**
614+
* Test deduplicate function with no duplicates.
615+
*
616+
* @covers ::deduplicate
617+
*/
618+
public function test_deduplicate_no_duplicates() {
619+
$activity = new Activity();
620+
$activity->set_id( 'https://remote.example.com/activities/single-item' );
621+
$activity->set_type( 'Create' );
622+
$activity->set_actor( 'https://remote.example.com/users/testuser' );
623+
624+
$object = new Base_Object();
625+
$object->set_id( 'https://remote.example.com/objects/single-item' );
626+
$object->set_type( 'Note' );
627+
$activity->set_object( $object );
628+
629+
$inbox_id = Inbox::add( $activity, 1 );
630+
631+
// Deduplicate should return the same post.
632+
$result = Inbox::deduplicate( 'https://remote.example.com/activities/single-item' );
633+
$this->assertInstanceOf( 'WP_Post', $result );
634+
$this->assertEquals( $inbox_id, $result->ID );
635+
}
636+
637+
/**
638+
* Test deduplicate function with duplicates.
639+
*
640+
* @covers ::deduplicate
641+
*/
642+
public function test_deduplicate_with_duplicates() {
643+
$activity = new Activity();
644+
$activity->set_id( 'https://remote.example.com/activities/duplicate-guid' );
645+
$activity->set_type( 'Create' );
646+
$activity->set_actor( 'https://remote.example.com/users/testuser' );
647+
648+
$object = new Base_Object();
649+
$object->set_id( 'https://remote.example.com/objects/duplicate-guid' );
650+
$object->set_type( 'Note' );
651+
$activity->set_object( $object );
652+
653+
// Manually create duplicate inbox posts with same GUID.
654+
$inbox_id_1 = \wp_insert_post(
655+
array(
656+
'post_type' => Inbox::POST_TYPE,
657+
'post_status' => 'publish',
658+
'post_content' => \wp_json_encode( $activity->to_array() ),
659+
'guid' => 'https://remote.example.com/activities/duplicate-guid',
660+
)
661+
);
662+
\add_post_meta( $inbox_id_1, '_activitypub_user_id', 1 );
663+
\add_post_meta( $inbox_id_1, '_activitypub_user_id', 2 );
664+
665+
$inbox_id_2 = \wp_insert_post(
666+
array(
667+
'post_type' => Inbox::POST_TYPE,
668+
'post_status' => 'publish',
669+
'post_content' => \wp_json_encode( $activity->to_array() ),
670+
'guid' => 'https://remote.example.com/activities/duplicate-guid',
671+
)
672+
);
673+
\add_post_meta( $inbox_id_2, '_activitypub_user_id', 3 );
674+
\add_post_meta( $inbox_id_2, '_activitypub_user_id', 4 );
675+
676+
$inbox_id_3 = \wp_insert_post(
677+
array(
678+
'post_type' => Inbox::POST_TYPE,
679+
'post_status' => 'publish',
680+
'post_content' => \wp_json_encode( $activity->to_array() ),
681+
'guid' => 'https://remote.example.com/activities/duplicate-guid',
682+
)
683+
);
684+
\add_post_meta( $inbox_id_3, '_activitypub_user_id', 5 );
685+
686+
// Run deduplication.
687+
$result = Inbox::deduplicate( 'https://remote.example.com/activities/duplicate-guid' );
688+
689+
// Should return the first post.
690+
$this->assertInstanceOf( 'WP_Post', $result );
691+
$this->assertEquals( $inbox_id_1, $result->ID );
692+
693+
// Verify all recipients were merged.
694+
$recipients = Inbox::get_recipients( $inbox_id_1 );
695+
$this->assertCount( 5, $recipients );
696+
$this->assertContains( 1, $recipients );
697+
$this->assertContains( 2, $recipients );
698+
$this->assertContains( 3, $recipients );
699+
$this->assertContains( 4, $recipients );
700+
$this->assertContains( 5, $recipients );
701+
702+
// Verify duplicates were deleted.
703+
$this->assertNull( \get_post( $inbox_id_2 ) );
704+
$this->assertNull( \get_post( $inbox_id_3 ) );
705+
706+
// Verify only one post exists with this GUID.
707+
$posts = \get_posts(
708+
array(
709+
'post_type' => Inbox::POST_TYPE,
710+
'guid' => 'https://remote.example.com/activities/duplicate-guid',
711+
'posts_per_page' => -1,
712+
)
713+
);
714+
$this->assertCount( 1, $posts );
715+
}
716+
717+
/**
718+
* Test deduplicate function with non-existent GUID.
719+
*
720+
* @covers ::deduplicate
721+
*/
722+
public function test_deduplicate_non_existent() {
723+
$result = Inbox::deduplicate( 'https://remote.example.com/activities/non-existent' );
724+
$this->assertFalse( $result );
725+
}
582726
}

0 commit comments

Comments
 (0)