Module Async_bus

Async operations on Core.Bus.

val pipe1_exn : here:lexing_position -> ('a -> unit, [> Core.read ]) Bus.t -> 'a Async_kernel.Pipe.Reader.t

pipe1_exn t returns a pipe of updates from t by subscribing to t. Closing the pipe unsubscribes from t. Closing t closes the pipe. Calling pipe1_exn on a closed bus always returns an empty pipe. pipe1_exn raises in the same circumstances as subscribe_exn.

val pipe1_filter_map_exn : here:lexing_position -> ('a -> unit, [> Core.read ]) Bus.t -> f:('a -> 'b option) -> 'b Async_kernel.Pipe.Reader.t

pipe1_filter_map_exn is the filter_maping version of pipe1_exn, allowing users to filter_map the values without incurring the cost of an additional pipe.

val pipe2_filter_map_exn : ?stop:unit Async_kernel.Deferred.t -> here:lexing_position -> ('a -> 'b -> unit, [> Core.read ]) Bus.t -> f:('a -> 'b -> 'c option) -> 'c Async_kernel.Pipe.Reader.t

pipe2_filter_map_exn is like pipe1_filter_map_exn, but works on buses with arity 2.

module First_arity : sig ... end
val first_exn : ?stop:unit Async_kernel.Deferred.t -> here:lexing_position -> ('c, [> Core.read ]) Bus.t -> ('c, 'f, 'r) First_arity.t -> f:'f -> 'r Async_kernel.Deferred.t

first_exn here t arity ~f returns a deferred that becomes determined with value r when the first event is published to t where f returns Some r. first_exn then unsubscribes from t, ensuring that f is never called again after it returns Some. first_exn raises if it can't subscribe to the bus, i.e., if subscribe_exn raises. If f raises, then first_exn raises to the monitor in effect when first_exn was called. first_exn takes time proportional to the number of bus subscribers.

If stop is provided and becomes determined, f will not be called again, it will unsubscribe from the bus, and the deferred that was returned by first_exn will never become determined.