API Reference

JavaScript (client)

BoardStreams class

new:
new BoardStreams(url)

// examples:
var BS = new BoardStreams('/ws');
var BS = new BoardStreams('ws://example.com/ws_path');
var BS = new BoardStreams('wss://example.com/ws_path');

new starts attempting to connect to the websocket server denoted by the URL parameter. If the scheme and host are missing from the URL, they will be derived from the current page's URL. The object returned holds the websocket connection, and can be used to join channels.

Upon construction and upon disconnection, new connection attempts will be made after every 0, 1, 3, 5, 5, 5... seconds, until one succeeds.

BoardStreams object

joinChannel:
var channel = BS.joinChannel(channelName);

// examples:
var channel = BS.joinChannel('checkers:boards:15');
var channel = BS.joinChannel('chat');

Starts joining a channel. Returns the channel object.

This method does not require a websocket connection to be already open. It will cause the client to attempt to re-join the channel whenever a new websocket connection is opened.

If join is successful, the channel's initialState event will fire with the channel's current state. A backlog of (some or all) past events might also be received, if the server wants that. If joining does not succeed, for example due to authorization failure on the server-side, no data or notification will be sent back to the client, and no event will be triggered.

You may join the same channel more than once by calling this method multiple times with the same parameter. See FAQ for details.

See also: channel.leave()

Events
  • connectionStatus:
    BS.on('connectionStatus', (status) => { ... });

    Fires every time the connection to the websocket server is established or terminated.

    status is a boolean.

Observables

The same channel event is also available in the form of an rxjs 6 observable, and in particular a Behavior Subject. Completion and error do not occur.

  • connectionStatus$:
    BS.connectionStatus$.subscribe({
      next: (status) => ...,
    });

    See connectionStatus event.

Channel object

Events
  • initialState:
    channel.on('initialState', (state) => { ... });

    Fires every time the client joins the channel on the server, including on every re-connection.

    state is the channel's current state object (arbitrary JSON-able data).

  • state:
    channel.on('state', (state) => { ... });

    Fires when the client joins a channel and first receives the state object, and also when the state is updated on the server and the update reaches the client.

    state contains the channel's new state (arbitrary JSON-able data).

  • event:
    channel.on('event', (event) => { ... });

    Fires every time a channel event reaches the client.

    event is arbitrary JSON-able data.

  • eventPatch:
    channel.on('eventPatch', ({ event, patch }) => { ... });
    
    
    // example:
    import BoardStreams from 'boardstreams';
    
    const myState = ...;
    
    channel.on('eventPatch', (eventPatch) => {
      const { event, patch } = eventPatch;
    
      // first do something with event
      if (event !== null) {
        await doSomethingWithEvent(event);
      }
    
      // then apply patch to your own state object
      if (patch !== null && patch !== undefined) {
        BoardStreams.patchState(myState, patch);
      }
    });

    Fires whenever an event happens, whenever the state changes, or whenever both happen at the same time. event or patch will be null if no event or no new state occurred. The patch key does not exist at all in the backlog events received during re-connection.

    This event can be useful if you want to handle events first in order to to do animations before allowing the state to change.

    event is arbitrary JSON-able data, but patch follows a format defined by the BoardStreams library, and can only be used with BoardStreams.patchState.

    patchState(myState, patch) modifies myState in-place, and is a no-op if patch is null or undefined.

    Note: due to the caveats in the Vue 2 reactivity model, you may not use the BoardStreams.patchState method itself directly on Vue 2 reactive state objects. Instead, you can use the API-compatible this.$BSPatchState as a drop-in replacement, as in this example:

    // in main.js of your Vue app:
    import BoardStreams from 'boardstreams';  // not needed when loading BoardStreams from CDN
    Vue.use(BoardStreams.Vue2Plugin);         // not needed when loading BoardStreams from CDN
    
    // in Vue component:
    if (patch !== null && patch !== undefined) {
      this.$BSPatchState(myState, patch);
    }

    Vue 3 does not have this restriction.

Observables

The same channel events are also available in the form of rxjs 6 observables. Completion occurs when the user leaves the channel, not when the connection drops. error does not occur.

  • initialStates$:
    channel.initialStates$.subscribe({
      next: (state) => ...,
      complete: () => ...,
    });

    See initialState event.

  • state$:
    channel.state$.subscribe({
      next: (state) => ...,
      complete: () => ...,
    });

    See state event.

  • events$:
    channel.events$.subscribe({
      next: (event) => ...,
      complete: () => ...,
    });

    See event event.

  • eventsPatches$:
    channel.eventsPatches$.subscribe({
      next: (eventPatch) => ...,
      complete: () => ...,
    });

    See eventPatch event.

Methods
  • doAction:
    channel.doAction(actionName, payload);
    
    // examples:
    channel.doAction('speak', 'Hello, folks!');
    channel.doAction('move', {
      x: 1,
      y: 0
    });

    Sends an action to the server through the channel. The server is programmed to handle it, and no response is sent back.

  • doRequest:
    var promise = channel.doRequest(requestName, payload);
    
    // example:
    try {
      var result = await channel.doRequest('roll_dice', '2d6');
      console.log('Sum of dice is:', result);
    } catch (e) {
      console.error('You lost the magic dice!');
    }

    Sends a request to the server through the channel and returns a promise. The server is programmed to handle the request, and sends back either a success or a failure response with data, which resolves or rejects the promise.

  • off:
    // Either:
    var fn = channel.on(eventName, handler);
    channel.off(eventName, fn);
    
    // or:
    channel.on(eventName, handler);
    channel.off(eventName, handler);

    Unsubscribes a handler from an event.

  • leave:
    channel.leave();

    Leaves the channel, unsubscribes from all channel events, and causes the channel observables to emit complete.

Perl (server)

All of the BoardStreams-related methods can be accessed under $c->bs or $app->bs, unless the prefix configuration option has been set.

App module

# in App module's setup method
$self->plugin(BoardStreams => {
  db_string => $pg_string,
});

This loads the BoardStreams Mojolicious plugin and sets the only required configuration option, which is the postgresql database string that is ultimately passed to the from_string method of Mojo::Pg . $pg_string should take the form "postgresql://$db_user:$db_pass\@$db_host/$db_name".

Other valid (but optional) configuration options are:

  • heartbeat_interval (seconds):
    How often each hypnotoad worker will register to the database that it is alive. Default: 5
  • heartbeat_timeout (seconds):
    How much time needs to pass after a missing heartbeat registration for the worker to be considered dead by the next global_cleanup call. Default: 5
  • prefix (string):
    The prefix prepended to all helpers added by the BoardStreams Mojolicious plugin, e.g. the "bs" in $app->bs->on_action(...). Default: "bs"
  • ping_interval (seconds):
    How often ping messages are sent from the webserver to the clients, in order to keep the websocket connection open. Default: 15
  • notify_payload_size_limit (number of bytes):
    Maximum number of bytes that can be transferred through a single PostgreSQL NOTIFY command. If you decrease this setting from its default of 8000 in PostgreSQL, you will need to set this option here as well. Default: 8000
  • cleanup_interval (seconds):
    Approximate interval between calls to global_cleanup. A value of 0 disables automatic global cleanup calls. Default: 5

Connection methods

initialize_client:
# in Mojolicious Controller
$c->bs->initialize_client;

# Example:
sub ws_handler ($self) {
  $self->bs->initialize_client;
}

This sets up the websocket connection to handle incoming joins, actions, requests and leaves.

joined:
$bool = $c->bs->joined($channel_name);

# examples:
my $have_joined = $c->bs->joined('chat');
my $have_joined = $c->bs->joined('boardgames:chess:15');

Returns a boolean describing whether the client has joined and is in a particular channel.

App methods

on_join:
$app->bs->on_join($channel_path, sub ($channel_name, $c, $attrs) {
  my $is_reconnect = $attrs->{is_reconnect};

  my $limit = calculate_limit($is_reconnect);

  return { limit => $limit }; # or...
  return undef; # or any false value
});


# examples:
$app->bs->on_join('chat', sub ($channel_name, $c, $attrs) {
  # $channel_name is "chat"
  # $attrs is { is_reconnect => 0 or 1 }
});

$app->bs->on_join(['checkers:boards', qr/^\d+$/], sub ($channel_name, $c, $attrs) {
  # $channel_name can be checkers:boards:15 or any other number
});

$app->bs->on_join(['checkers:boards', qr/^\d+$/, 'chat'], sub ($channel_name, $c, $attrs) {
  # $channel_name can be checkers:boards:15:chat, etc
}

# async
$app->bs->on_join('chat', async sub ($channel_name, $c, $attrs) {
  await ...;
  return { limit => $limit };
});

Sets the join handler for a channel (or family of channels), responsible for deciding whether to allow the user to join, and how many of the most recent past events should be sent to the client upon joining.

The sub handler may be sync or async.

$attrs is a hashref that contains information about whether the join request is a fresh join, or whether it has been issued automatically during websocket re-connection after a temporary network disconnect. This information is provided by the client, so do not use it for authorization.

If the sub handler dies or returns a false value, the client's join request will be ignored and join does not occur. A true value means the client will join and start receiving all of this channel's events and state object updates.

If it returns a true value, it needs to be a hashref that contains the limit property. This property shows how many of the most recent past events will be sent to the client before they receive the initial state object. If the join request has been automatically issued due to a re-connect, the client will not receive any events they had already received while they were previously connected.limit can also be set to the string "all", meaning all of the previous events should be sent to the client, starting from the one after the last one they received through their previous connection if this is a re-connect.

The limit property defaults to 0.

The channel needs to exist before the join handler is executed.

on_leave:
$app->bs->on_leave($chanel_path, sub ($channel_name, $c) { ... });

# examples:
$app->bs->on_leave('chat', sub ($channel_name, $c) { ... });
$app->bs->on_leave(['chess', qr/^\d+$/], sub ($channel_name, $c) { ... });

Sets the leave handler for a channel or family of channels.

The handler is called when the client issues a channel.leave() command, or when the websocket connection closes.

Creating this handler is optional.

on_action:
$app->bs->on_action($channel_path, $action_name, sub ($channel_name, $c, $payload) { ... });

# examples:
$app->bs->on_action('chat', 'speak', sub ($channel_name, $c, $payload) { ... });
$app->bs->on_action(['boardgames:chess', qr/^\d+$/], 'move', sub ($channel_name, $c, $payload) { ... });

Sets the handler for the client action with the particular $action_name.

Its return value is ignored.

on_request:
$app->bs->on_request($channel_path, $request_name, sub ($channel_name, $c, $payload) { ... });

# examples:
$app->bs->on_request('chat', 'whoami', sub ($channel_name, $c, $payload) { ... });
$app->bs->on_request('chat', 'whoami', async sub ($channel_name, $c, $payload) { ... });

Sets the handler for the client request with the particular $request_name.

The handler sub can be async, and its return value is returned to the client.

If the request handler sub dies, or returns a promise that is rejected, the client will get notified of the error, and their JavaScript promise will also get rejected. If the error is JSON-able, it is sent as-is, otherwise Perl will stringify it before sending it.

on_cleanup:
$app->bs->on_cleanup('my_channel', sub ($channel_name, $txn, $alive_workers) {
  $txn->lock_state(...);
});

See guard count .

create_channel:
$app->bs->create_channel($channel_name, $starting_state, $channel_attrs = {});

# examples:
$app->bs->create_channel('boardgames:chess:15', {
  players => [],
  pieces => { ... },
});

$app->bs->create_channel('jukebox',
  { currentlyPlaying => undef },
  { type => 'widgets', keep_events => 0 },
);

Creates a channel that clients can join. Returns false if channel already exists, true if channel was created. So it's okay to call it more than once.

$starting_state is arbitrary JSON-able data describing the channel's initial state object.

$channel_attrs is an optional hashref that may contain keys type (string, defaults to '') andkeep_events (boolean, defaults to 1).

type is an arbitrary string of up to 50 characters long that is stored in "type", an indexed field of the database's channel row, to allow you to search channels faster in your own scripts.

keep_events defines whether or not channel events should be stored in the database. You may set it to 0 if you're not interested in this channel sending past events to clients upon connection. See also: delete_events

pg:

Returns the Mojo::Pg object used by the BoardStreams library. E.g.:

my $pg = $app->bs->pg;

$pg->max_connections(3);

Both connection and app methods

worker_uuid:
my $uuid = $c->bs->worker_uuid;
Returns the globally unique identifier of the hypnotoad worker process.
lock_state:
$c->bs->lock_state($channel_name, sub ($state) { ... });
$app->bs->lock_state($channel_name, sub ($state) { ... });

# example:
$app->bs->lock_state('chess:board:14', sub ($state) {
  $state->{pieces}{white}{king}{position} = 'B5';
  my $event = { player => 'white', from => 'A4', to => 'B5' };

  return $event, $state;
});

Locks a channel's state object to allow it to be modified without conflicts from other processes.

The sub receives the current state object, and should return an event and the modified state object. Both return values are optional, but need to be replaced with undef if missing. If they are undef it means there is no event or no modification in the channel's state object. Otherwise the state will be saved, and the event and/or state update will be sent to all clients in this channel. This event & state update pair causes the channel on the client to emit an eventPatch event.

If the sub dies, an exception will be thrown. If you want to avoid triggering an eventPatch event, you can
return undef, undef;.

The sub can only be synchronous.

  • Sending more than one events in a single lock_state:

    Return a reference to an arrayref containing all the events you want to emit, instead of $event:

    $c->bs->lock_state('chess:board:14', sub ($state) {
      $state = ...;
      my @events = (
        { move => ['a1', 'c3'] },
        { won => 'white' },
      );
    
      return \[@events], $state;
    });
    The last event will be sent together with the state diff in the same websocket message, and will trigger the same eventPatch event on the client.
  • Locking more than one channels with a single lock_state:

    You can lock multiple channels with a single lock_state call by replacing $channel_name with \@channel_names.

    The sub will receive an arrayref of all the states, and it should return a list of [$event, $new_state] arrayrefs, one per channel:

    # nuclear attack to area3 and area4, in a game with a huge map
    $c->bs->lock_state(['strategy:area:3', 'strategy:area:4'], sub ($states) {
      my ($state3, $state4) = @$states;
    
      $state3->{units} = []; # everything is gone
      $state4->{units} = []; # everything is gone
    
      my $event3 = 'everything is gone';
      my $event4 = 'everything is gone';
    
      return [$event3, $state3], [$event4, $state4];
    });
  • Calling lock_state multiple times in the same database transaction:

    You can call lock_state multiple times (nested or not) on a $txn transaction object you create with $c->bs->begin and commit with $txn->commit. Dying anywhere in between will rollback the entire transaction.

    my $txn = $c->bs->begin;
    
    # first lock_state
    $txn->lock_state('area3', sub ($state3) {
      $state3->{foo} = 'bar';
    
      if ($state3->{baz} eq 'abc') {
    
        # second lock_state
        $txn->lock_state('area4', sub ($state4) {
          $state4->{foo} = 'bar';
          return undef, $state4;
        });
    
      }
    
      return undef, $state3;
    });
    
    $txn->commit;
  • Increasing/decreasing the guard count of a channel/worker pair:

    To protect the consistency of your state objects against the possibility of a postgresql server restart that would prevent the normal leave callbacks from being called if they contain lock_state calls, you can use guard counts and cleanup functions.

    Guard count increase of the channel/worker pair is the third parameter you can return from a lock_state sub.

    A positive total guard count of the channel/worker pair when the worker has timed-out after no longer emitting heartbeats, means that in the next global_cleanup call this channel will have its user-defined cleanup sub executed.

    The total guard count of a channel/worker pair cannot go below zero.

    $app->bs->on_join('chat', sub ($channel_name, $c, $attrs) {
      my $username = ...;
      my $worker_uuid = $c->bs->worker_uuid;
    
      $c->bs->lock_state('chat', sub ($state) {
        $state->{users}{$username}{$worker_uuid}++;
    
        # 1 is the increase of the guard count of the pair
        return undef, $state, 1;
      });
    });
    
    $app->bs->on_leave('chat', sub ($channel_name, $c) {
      my $username = ...;
      my $worker_uuid = $c->bs->worker_uuid;
    
      $c->bs->lock_state('chat', sub ($state) {
        $state->{users}{$username}{$worker_uuid}--;
        if ($state->{users}{$username}{$worker_uuid} <= 0) {
          delete $state->{users}{$username}{$worker_uuid};
          if (! $state->{users}{$username}->%*) {
            delete $state->{users}{$username};
          }
        }
    
        # decrease the guard count by 1
        return undef, $state, -1;
      });
    });
    
    # remove stale usernames from chatroom, after a postgresql restart
    $app->bs->on_cleanup('chat', sub ($channel_name, $txn, $alive_workers) {
    
      $txn->lock_state($channel_name, sub ($state) {
        foreach my $username (keys $state->{users}->%*) {
          foreach my $worker_uuid (keys $state->{users}{$username}->%*) {
            if (! $alive_workers->{$worker_uuid}) {
              delete $state->{users}{$username}{$worker_uuid};
              if (! $state->{users}{$username}->%*) {
                delete $state->{users}{$username};
              }
            }
          }
        }
    
        return undef, $state; # could also return undef to signify no change
      });
    
    });

    If you want to do a lock_state in your cleanup handler as above, you should use the $txn object as its invocant, as this transaction is the one that will remove the guard counters once the channel's cleanup is complete.

delete_events:

Channel events you create accumulate in the database, if the channel's keep_events flag is on. You can delete them with delete_events.

# keep the last $n events
$app->bs->delete_events($channel_name, keep_num => $n);

# keep events of last 5 minutes (postgresql duration syntax)
$app->bs->delete_events($channel_name, keep_dur => '5 minute');

# keep events of last 5 minutes, or $n events (whichever is more)
$app->bs->delete_events($channel_name,
  keep_dur => '5 minute',
  keep_num => $n,
);

Perl (client)

BoardStreams::Client class

new:
BoardStreams::Client->new($url)

# examples:
my $BS = BoardStreams::Client->new('ws://example.com/ws_path');
my $BS = BoardStreams::Client->new('wss://example.com/ws_path');
See JS new .

BoardStreams::Client object

join_channel:
my $channel = BS.joinChannel($channel_name);
See JS joinChannel .
Events
Observables
  • connection_status_o:
    $BS->connection_status_o->subscribe({
      next => sub ($status) { ... },
    });
    See JS connectionStatus$ .

Channel object

Events
  • initial_state, state, event, event_patch
    $channel->on('initial_state', sub ($channel, $state) { ... });
    $channel->on('state', sub ($channel, $state) { ... });
    $channel->on('event', sub ($channel, $event) { ... });
    $channel->on('event_patch', sub ($channel, $ev_pa) {
      my ($event, $patch) = $ev_pa->@{qw/ event patch /};
      ...
    });
    See JS channel events .
Observables
  • initial_states_o, state_o, events_o, events_patches_o
    $channel->initial_states_o->subscribe({
      next => sub ($state) { ... },
      complete => sub () { ... },
    });
    
    # etc
    See JS channel observables .
Methods