Rpc.Connectioninclude module type of struct include Async_rpc_kernel.Rpc.Connection endval sexp_of_t : t -> Sexplib0.Sexp.tmodule Close_reason = Async_rpc_kernel.Rpc.Connection.Close_reasonmodule Heartbeat_config = Async_rpc_kernel.Rpc.Connection.Heartbeat_configmodule Heartbeat_timeout_style =
Async_rpc_kernel.Rpc.Connection.Heartbeat_timeout_stylemodule Client_implementations =
Async_rpc_kernel.Rpc.Connection.Client_implementationsval description : t -> Core.Info.tval add_heartbeat_callback : t -> (unit -> unit) -> unitAfter add_heartbeat_callback t f, f () will be called after every subsequent heartbeat received by t.
val reset_heartbeat_timeout : t -> Core.Time_ns.Span.t -> unitChanges the heartbeat timeout and restarts the timer by setting last_seen_alive to the current time.
val effective_heartbeat_timeout : t -> Core.Time_ns.Span.tIf no environment override is specified, this is the heartbeat config timeout. Otherwise, this is the max between the environment override and the heartbeat config timeout.
val last_seen_alive : t -> Core.Time_ns.tThe last time either any message has been received or reset_heartbeat_timeout was called.
val close_with_reason :
?streaming_responses_flush_timeout:Core.Time_ns.Span.t ->
?wait_for_open_queries_timeout:Core.Time_ns.Span.t ->
?reason:Close_reason.Protocol.t ->
t ->
unit Async_kernel.Deferred.tclose_with_reason starts closing the connection's transport, and returns a deferred that becomes determined when its close completes. It is ok to call close_with_reason multiple times on the same t; calls subsequent to the initial call will have no effect, but will return the same deferred as the original call.
If wait_for_open_queries_timeout is set, close will wait for any open queries (both to and from the remote peer) to complete before closing the transport. During that time any new queries will fail immediately and is_closed t will return true even if there still are open queries.
Before closing the underlying transport's writer, close_with_reason waits for all streaming responses to be Pipe.upstream_flushed with a timeout of streaming_responses_flush_timeout.
The reason for closing the connection will be passed to callers of close_reason.
val close :
?streaming_responses_flush_timeout:Core.Time_ns.Span.t ->
?wait_for_open_queries_timeout:Core.Time_ns.Span.t ->
?reason_kind:Close_reason.Protocol.Kind.t ->
?reason:Core.Info.t ->
t ->
unit Async_kernel.Deferred.tclose starts closing the connection's transport, and returns a deferred that becomes determined when its close completes. It is ok to call close multiple times on the same t; calls subsequent to the initial call will have no effect, but will return the same deferred as the original call.
If wait_for_open_queries_timeout is set, close will wait for any open queries (both to and from the remote peer) to complete before closing the transport. During that time any new queries will fail immediately and is_closed t will return true even if there still are open queries.
Before closing the underlying transport's writer, close waits for all streaming responses to be Pipe.upstream_flushed with a timeout of streaming_responses_flush_timeout.
The reason for closing the connection will be passed to callers of close_reason.
The reason_kind allows for providing a more explicit variant of reason for the connection being closed, and is observable to clients as part of Close_reason.t produced by close_reason_structured.
val close_finished : t -> unit Async_kernel.Deferred.tclose_finished becomes determined after the close of the connection's transport completes, i.e. the same deferred that close returns (although it also becomes determined if the other side closes the connection). close_finished differs from close in that it does not have the side effect of initiating a close.
val close_reason :
t ->
on_close:[ `started | `finished ] ->
Core.Info.t Async_kernel.Deferred.tclose_reason ~on_close t becomes determined when close starts or finishes based on on_close, but additionally returns the reason that the connection was closed.
val close_reason_structured :
t ->
on_close:[ `started | `finished ] ->
Close_reason.t Async_kernel.Deferred.tclose_reason_structured ~on_close t becomes determined when close starts or finishes based on on_close, but additionally returns the reason that the connection was closed.
val is_closed : t -> boolis_closed t returns true iff close t has been called. close may be called internally upon errors or timeouts.
val bytes_to_write : t -> intbytes_to_write and flushed just call the similarly named function on the Transport.Writer.t within a connection.
val bytes_written : t -> Core.Int63.tbytes_written just calls the similarly named functions on the Transport.Writer.t within a connection.
val bytes_read : t -> Core.Int63.tbytes_read just calls the similarly named function on the Transport.Reader.t within a connection.
val flushed : t -> unit Async_kernel.Deferred.tPeer menu will become determined before any other messages are received. The menu is sent automatically on creation of a connection. If the peer is using an older version, the value is immediately determined to be None. If the connection is closed before the menu is received, an error is returned.
It is expected that one will call Versioned_rpc.Connection_with_menu.create instead of this function and that will request the menu via rpc if it gets None.
Like peer_menu but returns an rpc result
val peer_identification : t -> Core.Bigstring.t option Async_kernel.Deferred.tPeer identification will become determined before any other messages are received. If the peer is using an older version, the peer id is immediately determined to be None. If the connection is closed before the menu is received, None is returned.
val create :
?implementations:'s Implementations.t ->
connection_state:(t -> 's) ->
?max_message_size:int ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
Async_unix.Reader.t ->
Async_unix.Writer.t ->
(t, Core.Exn.t) Core.Result.t Async_kernel.Deferred.tThese functions are mostly the same as the ones with the same names in Async_rpc_kernel.Rpc.Connection; see Connection_intf in that library for documentation. The differences are that:
Async_unix.Reader.t, Async_unix.Writer.t and max_message_size instead of a Transport.tTime instead of Time_nsval contains_magic_prefix : Async_unix.Reader.t -> bool Async_kernel.Deferred.tAs of Feb 2017, the RPC protocol started to contain a magic number so that one can identify RPC communication. The bool returned by contains_magic_prefix says whether this magic number was observed.
This operation is a "peek" that does not advance any pointers associated with the reader. In particular, it makes sense to call create on a reader after calling this function.
val with_close :
?implementations:'s Implementations.t ->
?max_message_size:int ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
connection_state:(t -> 's) ->
Async_unix.Reader.t ->
Async_unix.Writer.t ->
dispatch_queries:(t -> 'a Async_kernel.Deferred.t) ->
on_handshake_error:
[ `Raise | `Call of Core.Exn.t -> 'a Async_kernel.Deferred.t ] ->
'a Async_kernel.Deferred.tval server_with_close :
?max_message_size:int ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
Async_unix.Reader.t ->
Async_unix.Writer.t ->
implementations:'s Implementations.t ->
connection_state:(t -> 's) ->
on_handshake_error:
[ `Raise | `Ignore | `Call of Core.Exn.t -> unit Async_kernel.Deferred.t ] ->
unit Async_kernel.Deferred.ttype transport_maker = Async_unix.Fd.t -> max_message_size:int -> Transport.tA function creating a transport from a file descriptor. It is responsible for setting the low-level parameters of the underlying transport.
For instance to set up a transport using Async.{Reader,Writer} and set a buffer age limit on the writer, you can pass this to the functions of this module:
~make_transport:(fun fd ~max_message_size ->
Rpc.Transport.of_fd fd ~max_message_size ~buffer_age_limit:`Unlimited)val serve :
implementations:'s Implementations.t ->
initial_connection_state:('address -> t -> 's) ->
where_to_listen:('address, 'listening_on) Async_unix.Tcp.Where_to_listen.t ->
?max_connections:int ->
?max_accepts_per_batch:int ->
?backlog:int ->
?drop_incoming_connections:bool ->
?time_source:[> Core.read ] Async_kernel.Time_source.T1.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?auth:('address -> bool Async_kernel.Deferred.t) ->
?on_handshake_error:[ `Raise | `Ignore | `Call of 'address -> exn -> unit ] ->
?on_initial_connection_state_error:
[ `Raise | `Ignore | `Call of 'address -> exn -> unit ] ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
unit ->
('address, 'listening_on) Async_unix.Tcp.Server.t Async_kernel.Deferred.tserve implementations ~port ?on_handshake_error () starts a server with the given implementation on port. The optional auth function will be called on all incoming connections with the address info of the client and will disconnect the client immediately if it returns false. This auth mechanism is generic and does nothing other than disconnect the client -- any logging or record of the reasons is the responsibility of the auth function itself.
val serve_inet :
implementations:'s Implementations.t ->
initial_connection_state:(Async_unix.Socket.Address.Inet.t -> t -> 's) ->
where_to_listen:Async_unix.Tcp.Where_to_listen.inet ->
?max_connections:int ->
?max_accepts_per_batch:int ->
?backlog:int ->
?drop_incoming_connections:bool ->
?time_source:[> Core.read ] Async_kernel.Time_source.T1.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?auth:(Async_unix.Socket.Address.Inet.t -> bool Async_kernel.Deferred.t) ->
?on_handshake_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Inet.t -> exn -> unit ] ->
?on_initial_connection_state_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Inet.t -> exn -> unit ] ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
unit ->
(Async_unix.Socket.Address.Inet.t, int) Async_unix.Tcp.Server.tAs serve, but only accepts IP addresses, not Unix sockets; returns server immediately rather than asynchronously.
val serve_unix :
implementations:'s Implementations.t ->
initial_connection_state:
(Async_unix.Socket.Address.Unix.t ->
Linux_ext.Peer_credentials.t ->
t ->
's) ->
where_to_listen:Async_unix.Tcp.Where_to_listen.unix ->
?max_connections:int ->
?max_accepts_per_batch:int ->
?backlog:int ->
?drop_incoming_connections:bool ->
?time_source:[> Core.read ] Async_kernel.Time_source.T1.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?auth:(Async_unix.Socket.Address.Unix.t -> bool Async_kernel.Deferred.t) ->
?on_handshake_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Unix.t -> exn -> unit ] ->
?on_initial_connection_state_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Unix.t -> exn -> unit ] ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
unit ->
Async_unix.Tcp.Server.unix Async_kernel.Deferred.tAs serve, but only accepts Unix sockets; provides peer credentials of the socket to initial_connection_state.
val client :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
_ Async_unix.Tcp.Where_to_connect.t ->
(t, Core.Exn.t) Core.Result.t Async_kernel.Deferred.tclient where_to_connect () connects to the server at where_to_connect and returns the connection or an Error if a connection could not be made. It is the responsibility of the caller to eventually call close.
In client and with_client, the handshake_timeout encompasses both the TCP connection timeout and the timeout for this module's own handshake.
val client' :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
'address Async_unix.Tcp.Where_to_connect.t ->
('address * t, Core.Exn.t) Core.Result.t Async_kernel.Deferred.tSimilar to client, but additionally expose the Socket.Address.t of the RPC server that we connected to.
val with_client :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
_ Async_unix.Tcp.Where_to_connect.t ->
(t -> 'a Async_kernel.Deferred.t) ->
('a, Core.Exn.t) Core.Result.t Async_kernel.Deferred.twith_client where_to_connect f connects to the server at where_to_connect and runs f until an exception is thrown or until the returned Deferred is fulfilled.
NOTE: As with with_close, you should be careful when using this with Pipe_rpc. See with_close for more information.
val with_client' :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
?provide_rpc_shapes:bool ->
?heartbeat_timeout_style:Heartbeat_timeout_style.t ->
?validate_connection:
(identification_from_peer:Core.Bigstring.t option ->
unit Or_not_authorized.t Async_kernel.Deferred.t) ->
'transport Async_unix.Tcp.Where_to_connect.t ->
(remote_server:'transport -> t -> 'a Async_kernel.Deferred.t) ->
('a, Core.Exn.t) Core.Result.t Async_kernel.Deferred.tSimilar to with_client, but additionally expose the Socket.Address.t of the RPC server that we connected to.