Skip to content
4 changes: 4 additions & 0 deletions .github/changelog/2360-from-description
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Significance: minor
Type: added

Added new configuration options to better manage traffic spikes when federating posts, allowing finer control over retry limits, delays, and batch pauses.
83 changes: 71 additions & 12 deletions includes/class-dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,76 @@
* @see https://www.w3.org/TR/activitypub/
*/
class Dispatcher {

/**
* Batch size.
*
* @deprecated unreleased {@see Activitypub\Dispatcher::get_batch_size()}
*
* @var int
*/
public static $batch_size = ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE;

/**
* Error codes that qualify for a retry.
* Get the batch size for processing outbox items.
*
* @return int The batch size.
*/
public static function get_batch_size() {
/**
* Filters the batch size for processing outbox items.
*
* @param int $batch_size The batch size. Default ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE.
*/
return apply_filters( 'activitypub_dispatcher_batch_size', ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE );
}

/**
* Get the maximum number of retry attempts.
*
* @return int The maximum number of retry attempts.
*/
public static function get_retry_max_attempts() {
/**
* Filters the maximum number of retry attempts.
*
* @param int $retry_max_attempts The maximum number of retry attempts. Default ACTIVITYPUB_OUTBOX_RETRY_MAX_ATTEMPTS.
*/
return apply_filters( 'activitypub_dispatcher_retry_max_attempts', 3 );
}

/**
* Get the retry delay unit (in seconds).
*
* Used to calculate exponential backoff: time() + (attempt * attempt * retry_delay_unit).
*
* @return int The retry delay unit in seconds.
*/
public static function get_retry_delay_unit() {
/**
* Filters the retry delay unit (in seconds).
*
* Used to calculate exponential backoff: time() + (attempt * attempt * retry_delay_unit).
*
* @param int $retry_delay_unit The retry delay unit in seconds. Default ACTIVITYPUB_OUTBOX_RETRY_DELAY_UNIT.
*/
return apply_filters( 'activitypub_dispatcher_retry_delay_unit', HOUR_IN_SECONDS );
}

/**
* Get the error codes that qualify for a retry.
*
* @see https://github.com/tfredrich/RestApiTutorial.com/blob/fd08b0f67f07450521d143b123cd6e1846cb2e3b/content/advanced/responses/retries.md
* @var int[]
*
* @return int[] The error codes.
*/
public static $retry_error_codes = array( 408, 429, 500, 502, 503, 504 );
public static function get_retry_error_codes() {
/**
* Filters the error codes that qualify for a retry.
*
* @param int[] $retry_error_codes The error codes. Default array( 408, 429, 500, 502, 503, 504 ).
*/
return apply_filters( 'activitypub_dispatcher_retry_error_codes', array( 408, 429, 500, 502, 503, 504 ) );
}

/**
* Initialize the class, registering WordPress hooks.
Expand Down Expand Up @@ -82,7 +137,7 @@ public static function process_outbox( $id ) {
\do_action(
'activitypub_send_activity',
$outbox_item->ID,
self::$batch_size,
self::get_batch_size(),
\get_post_meta( $outbox_item->ID, '_activitypub_outbox_offset', true ) ?: 0 // phpcs:ignore
);
} else {
Expand All @@ -95,13 +150,17 @@ public static function process_outbox( $id ) {
/**
* Asynchronously runs batch processing routines.
*
* @param int $outbox_item_id The Outbox item ID.
* @param int $batch_size Optional. The batch size. Default ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE.
* @param int $offset Optional. The offset. Default 0.
* @param int $outbox_item_id The Outbox item ID.
* @param int|null $batch_size Optional. The batch size. Default null (uses filtered batch size).
* @param int $offset Optional. The offset. Default 0.
*
* @return array|void The next batch of followers to process, or void if done.
*/
public static function send_to_followers( $outbox_item_id, $batch_size = ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE, $offset = 0 ) {
public static function send_to_followers( $outbox_item_id, $batch_size = null, $offset = 0 ) {
if ( null === $batch_size ) {
$batch_size = self::get_batch_size();
}

$outbox_item = \get_post( $outbox_item_id );
$json = Outbox::get_activity( $outbox_item_id )->to_json();
$inboxes = Followers::get_inboxes_for_activity( $json, $outbox_item->post_author, $batch_size, $offset );
Expand Down Expand Up @@ -167,7 +226,7 @@ public static function retry_send_to_followers( $transient_key, $outbox_item_id,
$retries = self::send_to_inboxes( $inboxes, $outbox_item_id );

// Retry failed inboxes.
if ( ++$attempt < 3 && ! empty( $retries ) ) {
if ( ++$attempt < self::get_retry_max_attempts() && ! empty( $retries ) ) {
self::schedule_retry( $retries, $outbox_item_id, $attempt );
}
}
Expand Down Expand Up @@ -196,7 +255,7 @@ private static function send_to_inboxes( $inboxes, $outbox_item_id ) {
foreach ( $inboxes as $inbox ) {
$result = safe_remote_post( $inbox, $json, $outbox_item->post_author );

if ( is_wp_error( $result ) && in_array( $result->get_error_code(), self::$retry_error_codes, true ) ) {
if ( is_wp_error( $result ) && in_array( $result->get_error_code(), self::get_retry_error_codes(), true ) ) {
$retries[] = $inbox;
}

Expand Down Expand Up @@ -227,7 +286,7 @@ private static function schedule_retry( $retries, $outbox_item_id, $attempt = 1
\set_transient( $transient_key, $retries, WEEK_IN_SECONDS );

\wp_schedule_single_event(
\time() + ( $attempt * $attempt * HOUR_IN_SECONDS ),
\time() + ( $attempt * $attempt * self::get_retry_delay_unit() ),
'activitypub_retry_activity',
array( $transient_key, $outbox_item_id, $attempt )
);
Expand Down
20 changes: 17 additions & 3 deletions includes/class-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ class Scheduler {
*/
private static $batch_callbacks = array();

/**
* Get the pause between async batches (in seconds).
*
* @return int The pause in seconds.
*/
public static function get_async_batch_pause() {
/**
* Filters the pause between async batches (in seconds).
*
* @param int $async_batch_pause The pause in seconds. Default 30.
*/
return apply_filters( 'activitypub_scheduler_async_batch_pause', 30 );
}

/**
* Initialize the class, registering WordPress hooks.
*/
Expand Down Expand Up @@ -140,7 +154,7 @@ public static function deregister_schedules() {
public static function unschedule_events_for_item( $outbox_item_id ) {
$event_args = array(
$outbox_item_id,
Dispatcher::$batch_size,
Dispatcher::get_batch_size(),
\get_post_meta( $outbox_item_id, '_activitypub_outbox_offset', true ) ?: 0, // phpcs:ignore
);

Expand Down Expand Up @@ -276,7 +290,7 @@ public static function reprocess_outbox() {
foreach ( $ids as $id ) {
// Bail if there is a pending batch.
$offset = \get_post_meta( $id, '_activitypub_outbox_offset', true ) ?: 0; // phpcs:ignore
if ( \wp_next_scheduled( 'activitypub_send_activity', array( $id, ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE, $offset ) ) ) {
if ( \wp_next_scheduled( 'activitypub_send_activity', array( $id, Dispatcher::get_batch_size(), $offset ) ) ) {
return;
}

Expand Down Expand Up @@ -424,7 +438,7 @@ public static function async_batch() {

if ( ! empty( $next ) ) {
// Schedule the next run, adding the result to the arguments.
\wp_schedule_single_event( \time() + 30, \current_action(), \array_values( $next ) );
\wp_schedule_single_event( \time() + self::get_async_batch_pause(), \current_action(), \array_values( $next ) );
}
}

Expand Down