Small Swoole Rx Events

Reactive event bus for PHP powered by RxPHP and Swoole.
It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.

EventBus — simple Rx‐backed bus with on…


This content originally appeared on DEV Community and was authored by sebk69

Reactive event bus for PHP powered by RxPHP and Swoole.

It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.

  • EventBus — simple Rx‐backed bus with on(), onMany(), payloads(), once(), request()
  • SwooleSchedulerAsyncSchedulerInterface using Swoole\Timer (works with RxPHP time operators)
  • Event modelBasicEvent (name, payload, meta, rid) and EventInterface (correlation id)

Requirements

  • PHP 8.3+
  • ext-swoole 4.8+ / 5.x
  • reactivex/rxphp (2.x)

Installation

composer require small/swoole-rx-events

Quick start

use Small\SwooleRxEvents\EventBus;
use Small\SwooleRxEvents\SwooleScheduler;
use Small\SwooleRxEvents\Event\BasicEvent;

// Use the Swoole async scheduler
$bus = new EventBus(new SwooleScheduler());

// Subscribe by name
$bus->on('order.created')->subscribe(function ($e) {
    echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL;
});

// Emit an event
$bus->emitName('order.created', ['id' => 123]);

// If you’re in a plain CLI script, keep the loop alive briefly:
\Swoole\Timer::after(20, fn () => \Swoole\Event::exit());
\Swoole\Event::wait();

Concepts

Event

All event must implement EventInterface

namespace Small\SwooleRxEvents\Contract;

interface EventInterface
{

    public function getName(): string;
    public function getRid(): string;
    public function setRid(string $rid): self;

}

BasicEvent carries:

  • name (string)
  • payload (array)
  • meta (array, e.g. tracing, user)
  • rid (string, auto‐generated correlation id)

Bus

  • stream() — all events
  • on($name) / onMany([...]) — filtered streams
  • payloads($name) — payload‐only stream
  • once($name, ?map, ?timeoutMs) — resolve first matching event (optionally mapped)
  • request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs) Emits a request with a new rid, waits for the first response with the same rid.

Timeouts require an async scheduler. This library provides SwooleScheduler which implements AsyncSchedulerInterface.

API Examples

1) Listen & emit

$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload()));
$bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);

2) Request/Response with correlation id

// Responder: copies rid from incoming 'REQ' and emits 'RESP'
$bus->on('REQ')->subscribe(function ($e) use ($bus) {
    $bus->emit(
        (new BasicEvent('RESP', ['ok' => true], $e->getMeta()))
            ->setRid($e->getRid())   // correlate
    );
});

// Caller: request() subscribes FIRST, then emits; no race conditions
$bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100)
    ->subscribe(
        fn($resp) => var_dump($resp->getPayload()),          // ['ok' => true]
        fn($err)  => error_log($err->getMessage())
    );

3) once() with mapping & timeout

$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50)
    ->subscribe(
        fn($node) => echo "node=$node\n",
        fn($err)  => echo "timeout\n"
    );
$bus->emitName('health.ok', [], ['node' => 'api-1']);

4) Backpressure / batching (Rx composition)

$bus->on('order.created')
    ->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // every 0.5s or 100 items
    ->filter(fn($batch) => !empty($batch))
    ->subscribe(fn(array $batch) => persist_batch($batch));

Swoole integration tips

  • HTTP server: in on('request'), emit an event with meta containing a respond callable or the Response object. Downstream subscribers can produce a ResponseEvent.
  • Coroutines per subscriber: use Swoole coroutines in your subscribers if you do IO; Rx operators will orchestrate sequencing.
  • Event loop in CLI: outside a Swoole Server, start/stop the reactor with Swoole\Event::wait() / Event::exit() for timers to fire.


This content originally appeared on DEV Community and was authored by sebk69


Print Share Comment Cite Upload Translate Updates
APA

sebk69 | Sciencx (2025-09-19T20:59:56+00:00) Small Swoole Rx Events. Retrieved from https://www.scien.cx/2025/09/19/small-swoole-rx-events/

MLA
" » Small Swoole Rx Events." sebk69 | Sciencx - Friday September 19, 2025, https://www.scien.cx/2025/09/19/small-swoole-rx-events/
HARVARD
sebk69 | Sciencx Friday September 19, 2025 » Small Swoole Rx Events., viewed ,<https://www.scien.cx/2025/09/19/small-swoole-rx-events/>
VANCOUVER
sebk69 | Sciencx - » Small Swoole Rx Events. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2025/09/19/small-swoole-rx-events/
CHICAGO
" » Small Swoole Rx Events." sebk69 | Sciencx - Accessed . https://www.scien.cx/2025/09/19/small-swoole-rx-events/
IEEE
" » Small Swoole Rx Events." sebk69 | Sciencx [Online]. Available: https://www.scien.cx/2025/09/19/small-swoole-rx-events/. [Accessed: ]
rf:citation
» Small Swoole Rx Events | sebk69 | Sciencx | https://www.scien.cx/2025/09/19/small-swoole-rx-events/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.