Genstage patterns

Currently you seem to be on a fixed period “flush-cycle”. Process.send_after/4 returns a timer reference which can be used with Process.cancel_timer/1 to cancel the previous timer if you happen to release events in handle_event - in fact you should be able to factor out a common function between handle_events and handle_info for event release and timer renewal.

I don’t know about patterns but based on your description I’d start with this:

  • Write a simple GenServer that is dedicated to a single WebSocket. The idea being that the GenServer’s mailbox becomes your “unbounded buffer” (i.e. let the VM handle it). The GenServer itself processes one message at a time and blocks until the WebSocket is finished sending the current message.
  • Have those “Observer” WebSocket GenServers register with a “Observable” GenServer (reference to Observer Pattern but not in an OO way). The “Observable” GenServer simply sends all the events it receives to all the registered “Observers” (immediately and in turn).
  • Finally write a “Tap” GenStage. Essentially the “Tap” simply forwards all the events it receives - but not until it sends a copy to another process (in this case the “Observable” GenServer). Now it might be tempting to combine “Tap”/“Observable” but the priority is to get the events to the datastore, so by sticking to a simple “Tap” GenStage, the delay of getting the events to the datastore is minimized to the time it takes to put a copy of the events into the “Observable” mailbox. Distribution of the events happens on the “Observable’s” time and pushing messages into the WebSockets happens on the various “Observer’s” time. (The separation also creates the opportunity for the “Observable” process to be on a different CPU core than the “Tap” process.)