Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 

Channel

Introduction

The channel component of PSL provides a way to create a channels for communication between one, or more coroutine(s).

Usage

use Psl\Async;
use Psl\Channel;

/**
 * @var Channel\ReceiverInterface<string> $receiver
 * @var Channel\SenderInterface<string> $sender
 */
[$receiver, $sender] = Channel\unbounded();

// send a message to the channel after 1 second.
Async\Scheduler::delay(1, static fn() => $sender->send('Hello'));
// wait for the message.
$message = $receiver->receive();

API

Functions

  • [@template T php]
    [Channel\bounded(int<1, max> $capacity): array{Channel\ReceiverInterface<T>, Channel\SenderInterface<T>} php]

    Create a bounded channel with the given capacity.

    The created channel has space to hold at most [$capacity php] messages at a time.

    • [$capacity php]: The capacity of the channel.

    If [$capacity php] is not a positive integer, [Psl\Exception\InvariantViolationException php] is thrown.

    use Psl;
    use Psl\Channel;
    
    /**
     * @var Channel\SenderInterface<string> $sender
     * @var Channel\ReceiverInterface<string> $receiver
     */
    [$receiver, $sender] = Channel\bounded(1);
    
    $sender->send('Hello');
    Psl\invariant('Hello' === $receiver->receive(), 'Should receive "Hello"');
  • [@template T php]
    [Channel\unbounded(): array{0: Channel\ReceiverInterface<T>, 1: Channel\SenderInterface<T>} php]

    Creates an unbounded channel.

    The created channel can hold an unlimited number of messages.

    use Psl;
    use Psl\Channel;
    
    /**
     * @var Channel\ReceiverInterface<string> $receiver
     * @var Channel\SenderInterface<string> $sender
     */
    [$receiver, $sender] = Channel\unbounded();
    
    $sender->send('Hello');
    Psl\invariant('Hello' === $receiver->receive(), 'Should receive "Hello"');

Interfaces

  • [interface Channel\ChannelInterface<T> extends Countable php]

    Common interface for all channel types.

    • [ChannelInterface::getCapacity(): ?int<1, max> php]

      Returns the capacity of the channel, or null if the channel is unbounded.

      use Psl;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\unbounded();
      
      Psl\invariant(null === $receiver->getCapacity(), 'Capacity should be null');
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(10);
      
      Psl\invariant(10 === $receiver->getCapacity(), 'Capacity should be 10');
    • [ChannelInterface::isEmpty(): bool php]

      Returns true if the channel is empty, false otherwise.

      use Psl;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\unbounded();
      
      Psl\invariant(true === $receiver->isEmpty(), 'Channel should be empty');
      $sender->send('Hello');
      Psl\invariant(false === $receiver->isEmpty(), 'Channel should not be empty');
      $receiver->receive();
      Psl\invariant(true === $receiver->isEmpty(), 'Channel should be empty');
    • [ChannelInterface::isFull(): bool php]

      Returns true if the channel is full, false otherwise.

      Unbounded channels are never full.

      use Psl;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(2);
      
      Psl\invariant(false === $receiver->isFull(), 'Channel should not be full');
      $sender->send('Hello');
      $sender->send('World');
      Psl\invariant(true === $receiver->isFull(), 'Channel should be full');
      $receiver->receive();
      Psl\invariant(false === $receiver->isFull(), 'Channel should not be full');
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\unbounded();
      
      Psl\invariant(false === $receiver->isFull(), 'Unbound channel is never full.');
    • [ChannelInterface::count(): int<0, max> php]

      Returns the number of items in the channel.

      use Psl;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\unbounded();
      
      Psl\invariant(0 === $receiver->count(), 'Channel should be empty');
      $sender->send('Hello');
      Psl\invariant(1 === $receiver->count(), 'Channel should have 1 item');
      $receiver->receive();
      Psl\invariant(0 === $receiver->count(), 'Channel should be empty');
    • [ChannelInterface::close(): void php]

      Closes the channel.

      The remaining messages can still be received.

      use Psl;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\unbounded();
      
      $sender->send('Hello');
      
      Psl\invariant(false === $receiver->isClosed(), 'Channel should not be closed');
      // Close the channel.
      $receiver->close();
      Psl\invariant(true === $receiver->isClosed(), 'Channel should be closed');
      Psl\invariant('Hello' === $receiver->receive(), 'Should receive "Hello"');
      
      try {
          $receiver->receive();
      } catch (Channel\Exception\ClosedChannelException $e) {
          // Cannot receive from a closed empty channel.
      }
      
      try {
          $sender->send('World');
      } catch (Channel\Exception\ClosedChannelException $e) {
          // Cannot send to a closed channel.
      }
    • [ChannelInterface::isClosed(): bool php]

      Returns true if the channel is closed.

      use Psl;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\unbounded();
      
      Psl\invariant(false === $receiver->isClosed(), 'Channel should not be closed');
      Psl\invariant(false === $sender->isClosed(), 'Channel should not be closed');
      
      $receiver->close();
      
      Psl\invariant(true === $receiver->isClosed(), 'Channel should be closed');
      Psl\invariant(true === $sender->isClosed(), 'Channel should be closed');
  • [interface Channel\SenderInterface<T> extends Channel\ChannelInterface<T> php]

    Sender side of the channel

    • [SenderInterface::send(T $message): void php]

      Sends a message to the channel, waiting if the channel is full.

      • [$message php]: The message to send.

      If the channel is full, this method waits until there is space for a message.
      If the channel is closed, this method throws [Channel\Exception\ClosedChannelException php].

      use Psl;
      use Psl\Async;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(1);
      
      // Send a message to the channel immediately.
      $sender->send('Hello');
      // Receive the message from the channel, after 1 second.
      Async\Scheduler::delay(1, function () use ($receiver) {
          Psl\invariant('Hello' === $receiver->receive(), 'Should receive "Hello"');
      });
      // Send a message to the channel, waiting if the channel is full.
      $sender->send('World');
    • [SenderInterface::trySend(T $message): void php]

      Sends a message to the channel.

      • [$message php]: The message to send.

      If the channel is full, this method throws [Channel\Exception\FullChannelException php].
      If the channel is closed, this method throws [Channel\Exception\ClosedChannelException php].

      use Psl;
      use Psl\Async;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(1);
      
      // Try to send a message to the channel immediately, without waiting.
      $sender->trySend('Hello');
      
      try {
          // Try to send a message to the channel immediately, without waiting.
          $sender->trySend('World');
      } catch (Channel\Exception\FullChannelException $e) {
          // Cannot send to a full channel.
          // The first message 'Hello' has not been received yet.
      }
  • [interface Channel\ReceiverInterface<T> extends Channel\ChannelInterface<T> php]

    Receiver side of the channel

    • [ReceiverInterface::receive(): T php]

      Receives a message from the channel, waiting if the channel is empty.

      If the channel is empty, this method waits until there is a message.
      If the channel is closed and empty, this method throws [Channel\Exception\ClosedChannelException php].

      use Psl;
      use Psl\Async;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(1);
      
      $sender->send('Hello');
      // Send a message to the channel, after 1 second.
      Async\Scheduler::delay(1, function () use ($sender) {
          $sender->send('World');
      });
      
      // Receive a message from the channel immediately.
      Psl\invariant('Hello' === $receiver->receive(), 'Should receive "Hello"');
      
      // Receive a message from the channel, waiting if the channel is empty.
      Psl\invariant('World' === $receiver->receive(), 'Should receive "World"');
    • [ReceiverInterface::tryReceive(): T php]

      Receives a message from the channel.

      If the channel is empty, this method throws [Channel\Exception\EmptyChannelException php].
      If the channel is closed and empty, this method throws [Channel\Exception\ClosedChannelException php].

      use Psl;
      use Psl\Async;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(1);
      
      $sender->send('Hello');
      // Try to receive a message from the channel immediately, without waiting.
      Psl\invariant('Hello' === $receiver->tryReceive(), 'Should receive "Hello"');
      
      try {
        // Try to receive a message from the channel immediately, without waiting.
        $receiver->tryReceive();
      } catch (Channel\Exception\EmptyChannelException $e) {
          // Cannot receive from an empty channel.
      }
  • Exceptions

    • [final class Channel\Exception\FullChannelException implements Channel\Exception\ExceptionInterface extends Exception php]

      Thrown when calling Channel\SenderInterface<T>::trySend() on a full channel.

      use Psl;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(1);
      
      $sender->send('Hello');
      
      try {
          // Try to send a message to the channel immediately, without waiting.
          $sender->trySend('World');
      } catch (Channel\Exception\FullChannelException $e) {
          // Cannot send to a full channel without waiting.
      }
    • [final class Channel\Exception\EmptyChannelException implements Channel\Exception\ExceptionInterface extends Exception php]

      Thrown when calling Channel\ReceiverInterface<T>::tryReceive() on an empty channel.

      use Psl;
      use Psl\Async;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\bounded(1);
      
      try {
          // Try to receive a message from the channel immediately, without waiting.
          $receiver->tryReceive();
      } catch (Channel\Exception\EmptyChannelException $e) {
          // Cannot receive from an empty channel.
      }
    • [final class Channel\Exception\ClosedChannelException implements Channel\Exception\ExceptionInterface extends Exception php]

      Thrown when calling attempting to send or receive on a closed channel.

      use Psl;
      use Psl\Async;
      use Psl\Channel;
      
      /**
       * @var Channel\ReceiverInterface<string> $receiver
       * @var Channel\SenderInterface<string> $sender
       */
      [$receiver, $sender] = Channel\unbounded();
      
      // Send a message to the channel.
      $sender->send('Hello');
      // Close the channel.
      $receiver->close();
      Psl\invaraint('Hello' === $receiver->receive(), 'Should receive "Hello"');
      
      try {
          $receiver->receive();
      } catch (Channel\Exception\ClosedChannelException $e) {
          // Cannot receive from a closed empty channel.
      }
      
      try {
          $sender->send('World');
      } catch (Channel\Exception\ClosedChannelException $e) {
          // Cannot send to a closed channel.
      }