Documentation
¶
Overview ¶
Package rpc implements a remote procedure call over TCP, UNIX, HTTP and WS.
Index ¶
- Constants
- Variables
- func FreeContextBuffer(ctx context.Context)
- func GetBuffer(size int) []byte
- func GetContextBuffer(ctx context.Context) (buffer []byte)
- func Listen(network, address string, codec string) error
- func ListenTLS(network, address string, codec string, config *tls.Config) error
- func ListenWithOptions(address string, opts *Options) error
- func NewCodec(name string) func() Codec
- func NewHeaderEncoder(name string) func() *Encoder
- func NewSocket(network string) func(config *tls.Config) socket.Socket
- func Push(key string, value []byte)
- func PushFunc(watchFunc WatchFunc)
- func PutBuffer(buf []byte)
- func PutCall(call *Call)
- func Register(rcvr interface{}) error
- func RegisterCodec(name string, New func() Codec)
- func RegisterHeaderEncoder(name string, New func() *Encoder)
- func RegisterName(name string, rcvr interface{}) error
- func RegisterSocket(network string, New func(config *tls.Config) socket.Socket)
- func ResetDone(done chan *Call)
- func ServeCodec(codec ServerCodec)
- func Services() []string
- func SetBufferSize(size int)
- func SetContextBuffer(shared bool)
- func SetLogLevel(level LogLevel)
- func SetNoBatch(noBatch bool)
- func SetNoCopy(noCopy bool)
- func SetPipelining(enable bool)
- func SetPoll(enable bool)
- type BYTESCodec
- type CODECodec
- type Call
- type Client
- func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) error
- func (c *Client) CallWithContext(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
- func (c *Client) Close() (err error)
- func (c *Client) Fallback(d time.Duration)
- func (c *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
- func (c *Client) Ping() error
- func (c *Client) RoundTrip(call *Call) *Call
- func (c *Client) Update(targets ...string)
- func (c *Client) Watch(key string) (Watcher, error)
- type ClientCodec
- type Code
- type Codec
- type Conn
- func (conn *Conn) Call(serviceMethod string, args interface{}, reply interface{}) error
- func (conn *Conn) CallWithContext(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
- func (conn *Conn) Close() (err error)
- func (conn *Conn) Dial(s socket.Socket, address string, New NewClientCodecFunc) (*Conn, error)
- func (conn *Conn) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
- func (conn *Conn) NumCalls() (n uint64)
- func (conn *Conn) Ping() error
- func (conn *Conn) RoundTrip(call *Call) *Call
- func (conn *Conn) Watch(key string) (Watcher, error)
- type Context
- type Encoder
- type GOGOPBCodec
- type GoGoProtobuf
- type JSONCodec
- type LogLevel
- type MSGPCodec
- type MsgPack
- type NewClientCodecFunc
- type NewServerCodecFunc
- type Options
- type Request
- type Response
- type RoundTripper
- type Scheduling
- type Server
- func (server *Server) Close() error
- func (server *Server) GetLogLevel() LogLevel
- func (server *Server) Listen(network, address string, codec string) error
- func (server *Server) ListenTLS(network, address string, codec string, config *tls.Config) error
- func (server *Server) ListenWithOptions(address string, opts *Options) error
- func (server *Server) Push(key string, value []byte)
- func (server *Server) PushFunc(watchFunc WatchFunc)
- func (server *Server) Register(obj interface{}) error
- func (server *Server) RegisterName(name string, obj interface{}) error
- func (server *Server) ServeCodec(codec ServerCodec)
- func (server *Server) ServeRequest(codec ServerCodec, recving *sync.Mutex, sending *sync.Mutex, ...) error
- func (server *Server) Services() []string
- func (server *Server) SetBufferSize(size int)
- func (server *Server) SetContextBuffer(shared bool)
- func (server *Server) SetLogLevel(level LogLevel)
- func (server *Server) SetNoBatch(noBatch bool)
- func (server *Server) SetNoCopy(noCopy bool)
- func (server *Server) SetPipelining(enable bool)
- func (server *Server) SetPoll(enable bool)
- type ServerCodec
- type Transport
- func (t *Transport) Call(addr, serviceMethod string, args interface{}, reply interface{}) error
- func (t *Transport) CallWithContext(ctx context.Context, addr string, serviceMethod string, args interface{}, ...) error
- func (t *Transport) Close() error
- func (t *Transport) CloseIdleConnections()
- func (t *Transport) Go(addr, serviceMethod string, args interface{}, reply interface{}, ...) *Call
- func (t *Transport) Ping(addr string) error
- func (t *Transport) RoundTrip(addr string, call *Call) *Call
- func (t *Transport) Watch(addr, key string) (Watcher, error)
- type WatchFunc
- type Watcher
- type XMLCodec
Constants ¶
const ( //DefaultMaxConnsPerHost is the default value of Transport's MaxConnsPerHost. DefaultMaxConnsPerHost = 1 //DefaultMaxIdleConnsPerHost is the default value of Transport's MaxIdleConnsPerHost. DefaultMaxIdleConnsPerHost = 1 //DefaultKeepAlive is the default value of Transport's KeepAlive. DefaultKeepAlive = 90 * time.Second //DefaultIdleConnTimeout is the default value of Transport's IdleConnTimeout. DefaultIdleConnTimeout = 60 * time.Second )
Variables ¶
var ( // MaxConnsPerHost optionally limits the total number of // connections per host, including connections in the dialing, // active, and idle states. On limit violation, dials will block. MaxConnsPerHost = DefaultMaxConnsPerHost // MaxIdleConnsPerHost controls the maximum idle // (keep-alive) connections to keep per-host. If zero, // DefaultMaxIdleConnsPerHost is used. MaxIdleConnsPerHost = DefaultMaxIdleConnsPerHost // KeepAlive specifies the maximum amount of time keeping the active connections in the Transport's conns. KeepAlive = DefaultKeepAlive // IdleConnTimeout specifies the maximum amount of time keeping the idle connections in the Transport's idleConns. IdleConnTimeout = DefaultIdleConnTimeout // ErrDial is returned when dial failed. ErrDial = errors.New("dial failed") )
var BufferContextKey = &contextKey{"buffer"}
BufferContextKey is a context key.
var DefaultServer = NewServer()
DefaultServer is the default instance of *Server.
var DefaultTransport = &Transport{ MaxConnsPerHost: MaxConnsPerHost, MaxIdleConnsPerHost: MaxIdleConnsPerHost, KeepAlive: KeepAlive, IdleConnTimeout: IdleConnTimeout, Network: "tcp", Codec: "json", Dial: Dial, DialWithOptions: DialWithOptions, }
DefaultTransport is a default RPC transport.
var ErrShutdown = errors.New(shutdownMsg)
ErrShutdown is returned when the connection is shut down.
var ErrTimeout = errors.New("timeout")
ErrTimeout is returned after the timeout,.
var ErrWatch = errors.New("The watch is existed")
ErrWatch is returned when the watch is existed.
var ErrWatcherShutdown = errors.New("The watcher is shut down")
ErrWatcherShutdown is returned when the watcher is shut down.
var ErrorCODE = errors.New("is not Code")
ErrorCODE is the error that v is not Code
var ErrorGOGOPB = errors.New("is not GoGoProtobuf")
ErrorGOGOPB is the error that v is not GoGoProtobuf
var ErrorMSGP = errors.New("is not MSGP")
ErrorMSGP is the error that v is not MSGP
Functions ¶
func FreeContextBuffer ¶ added in v0.0.2
FreeContextBuffer frees the context buffer to the pool.
func GetContextBuffer ¶ added in v0.0.2
GetContextBuffer gets a buffer from the context.
func ListenWithOptions ¶
ListenWithOptions announces on the local network address with Options.
func NewHeaderEncoder ¶
NewHeaderEncoder returns a new header Encoder.
func Register ¶
func Register(rcvr interface{}) error
Register publishes the receiver's methods in the DefaultServer.
func RegisterCodec ¶
RegisterCodec registers a codec.
func RegisterHeaderEncoder ¶
RegisterHeaderEncoder registers a header Encoder.
func RegisterName ¶
RegisterName is like Register but uses the provided name for the type instead of the receiver's concrete type.
func RegisterSocket ¶
RegisterSocket registers a network socket.
func ServeCodec ¶
func ServeCodec(codec ServerCodec)
ServeCodec uses the specified codec to decode requests and encode responses.
func SetContextBuffer ¶ added in v0.0.2
func SetContextBuffer(shared bool)
SetContextBuffer sets shared buffer.
func SetNoBatch ¶
func SetNoBatch(noBatch bool)
SetNoBatch disables the Server to use batch writer.
func SetNoCopy ¶ added in v0.0.2
func SetNoCopy(noCopy bool)
SetNoCopy reuses a buffer from the pool for minimizing memory allocations. The RPC handler takes ownership of buffer, and the handler should not use buffer after this handle. The default option is to make a copy of data for every RPC handler.
func SetPipelining ¶
func SetPipelining(enable bool)
SetPipelining enables the Server to use pipelining.
Types ¶
type BYTESCodec ¶
type BYTESCodec struct {
}
BYTESCodec struct
func (*BYTESCodec) Marshal ¶
func (c *BYTESCodec) Marshal(buf []byte, v interface{}) ([]byte, error)
Marshal returns the BYTES encoding of v.
func (*BYTESCodec) Unmarshal ¶
func (c *BYTESCodec) Unmarshal(data []byte, v interface{}) error
Unmarshal parses the BYTES-encoded data and stores the result in the value pointed to by v.
type CODECodec ¶
type CODECodec struct {
}
CODECodec struct
type Call ¶
type Call struct {
Buffer []byte
Value []byte
ServiceMethod string
Args interface{}
Reply interface{}
CallError bool
Error error
Done chan *Call
// contains filtered or unexported fields
}
Call represents an active RPC.
type Client ¶
type Client struct {
Director func() (target string)
Transport RoundTripper
Scheduling Scheduling
Tick time.Duration
Alpha float64
DialTimeout time.Duration
// contains filtered or unexported fields
}
Client is an RPC client.
The Client's Transport typically has internal state (cached connections), so Clients should be reused instead of created as needed. Clients are safe for concurrent use by multiple goroutines.
A Client is higher-level than a RoundTripper such as Transport.
func (*Client) Call ¶
Call invokes the named function, waits for it to complete, and returns its error status.
func (*Client) CallWithContext ¶ added in v0.0.2
func (c *Client) CallWithContext(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
CallWithContext acts like Call but takes a context.
func (*Client) Go ¶
func (c *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
func (*Client) Ping ¶
Ping is NOT ICMP ping, this is just used to test whether a connection is still alive.
type ClientCodec ¶
type ClientCodec interface {
WriteRequest(*Context, interface{}) error
ReadResponseHeader(*Context) error
ReadResponseBody([]byte, interface{}) error
Close() error
}
ClientCodec implements writing of RPC requests and reading of RPC responses for the client side of an RPC session. The client calls WriteRequest to write a request to the connection and calls ReadResponseHeader and ReadResponseBody in pairs to read responses. The client calls Close when finished with the connection. ReadResponseBody may be called with a nil argument to force the body of the response to be read and then discarded.
func NewClientCodec ¶
func NewClientCodec(bodyCodec Codec, headerEncoder *Encoder, messages socket.Messages) ClientCodec
NewClientCodec returns a new ClientCodec.
type Codec ¶
type Codec interface {
Marshal(buf []byte, v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error
}
Codec defines the interface for encoding/decoding.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents an RPC Conn. There may be multiple outstanding Calls associated with a single Conn, and a Conn may be used by multiple goroutines simultaneously.
func DialWithOptions ¶
DialWithOptions connects to an RPC server at the specified network address with Options.
func NewConn ¶
func NewConn() *Conn
NewConn returns a new Conn to handle requests to the set of services at the other end of the connection. It adds a buffer to the write side of the connection so the header and payload are sent as a unit.
The read and write halves of the connection are serialized independently, so no interlocking is required. However each half may be accessed concurrently so the implementation of conn should protect against concurrent reads or concurrent writes.
func NewConnWithCodec ¶
func NewConnWithCodec(codec ClientCodec) *Conn
NewConnWithCodec is like NewConn but uses the specified codec to encode requests and decode responses.
func (*Conn) Call ¶
Call invokes the named function, waits for it to complete, and returns its error status.
func (*Conn) CallWithContext ¶ added in v0.0.2
func (conn *Conn) CallWithContext(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
CallWithContext acts like Call but takes a context.
func (*Conn) Close ¶
Close calls the underlying codec's Close method. If the connection is already shutting down, ErrShutdown is returned.
func (*Conn) Go ¶
func (conn *Conn) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
func (*Conn) Ping ¶
Ping is NOT ICMP ping, this is just used to test whether a connection is still alive.
type Context ¶
type Context struct {
Seq uint64
Upgrade []byte
ServiceMethod string
Error string
// contains filtered or unexported fields
}
Context is an RPC context for codec.
type Encoder ¶
Encoder defines the struct of Encoder.
func DefaultEncoder ¶
func DefaultEncoder() *Encoder
DefaultEncoder returns a default header Encoder.
func NewEncoder ¶
NewEncoder returns the instance of Encoder.
type GOGOPBCodec ¶
type GOGOPBCodec struct {
}
GOGOPBCodec struct
func (*GOGOPBCodec) Marshal ¶
func (c *GOGOPBCodec) Marshal(buf []byte, v interface{}) ([]byte, error)
Marshal returns the GOGOPB encoding of v.
func (*GOGOPBCodec) Unmarshal ¶
func (c *GOGOPBCodec) Unmarshal(data []byte, v interface{}) error
Unmarshal parses the GOGOPB-encoded data and stores the result in the value pointed to by v.
type GoGoProtobuf ¶
type GoGoProtobuf interface {
Size() (n int)
Marshal() (data []byte, err error)
MarshalTo(buf []byte) (int, error)
Unmarshal(data []byte) error
}
GoGoProtobuf defines the interface for gogo's protobuf.
type JSONCodec ¶
type JSONCodec struct {
}
JSONCodec struct
type LogLevel ¶
type LogLevel int
LogLevel defines the level for log. Higher levels log less info.
const ( //DebugLogLevel defines the level of debug in test environments. DebugLogLevel LogLevel = 1 //TraceLogLevel defines the level of trace in test environments. TraceLogLevel LogLevel = 2 //AllLogLevel defines the lowest level in production environments. AllLogLevel LogLevel = 3 //InfoLogLevel defines the level of info. InfoLogLevel LogLevel = 4 //NoticeLogLevel defines the level of notice. NoticeLogLevel LogLevel = 5 //WarnLogLevel defines the level of warn. WarnLogLevel LogLevel = 6 //ErrorLogLevel defines the level of error. ErrorLogLevel LogLevel = 7 //PanicLogLevel defines the level of panic. PanicLogLevel LogLevel = 8 //FatalLogLevel defines the level of fatal. FatalLogLevel LogLevel = 9 //OffLogLevel defines the level of no log. OffLogLevel LogLevel = 10 )
type MSGPCodec ¶
type MSGPCodec struct {
}
MSGPCodec struct
type MsgPack ¶
type MsgPack interface {
MarshalMsg(buf []byte) ([]byte, error)
UnmarshalMsg(bts []byte) (o []byte, err error)
}
MsgPack defines the interface for msgp.
type NewClientCodecFunc ¶
type NewClientCodecFunc func(messages socket.Messages) ClientCodec
NewClientCodecFunc is the function to make a new ClientCodec by socket.Messages.
type NewServerCodecFunc ¶
type NewServerCodecFunc func(messages socket.Messages) ServerCodec
NewServerCodecFunc is the function making a new ServerCodec by socket.Messages.
type Options ¶
type Options struct {
NewSocket func(*tls.Config) socket.Socket
NewCodec func() Codec
NewHeaderEncoder func() *Encoder
Network string
Codec string
HeaderEncoder string
TLSConfig *tls.Config
}
Options defines the struct of options.
type Request ¶
type Request interface {
SetSeq(uint64)
GetSeq() uint64
SetUpgrade([]byte)
GetUpgrade() []byte
SetServiceMethod(string)
GetServiceMethod() string
SetArgs([]byte)
GetArgs() []byte
Reset()
}
Request defines the interface of request.
func NewCODERequest ¶
func NewCODERequest() Request
NewCODERequest returns the instance of codeRequest.
func NewJSONRequest ¶
func NewJSONRequest() Request
NewJSONRequest returns the instance of jsonRequest.
type Response ¶
type Response interface {
SetSeq(uint64)
GetSeq() uint64
SetError(string)
GetError() string
SetReply([]byte)
GetReply() []byte
Reset()
}
Response defines the interface of response.
func NewCODEResponse ¶
func NewCODEResponse() Response
NewCODEResponse returns the instance of codeResponse.
func NewJSONResponse ¶
func NewJSONResponse() Response
NewJSONResponse returns the instance of jsonResponse.
func NewPBResponse ¶
func NewPBResponse() Response
NewPBResponse returns the instance of pbResponse.
type RoundTripper ¶
type RoundTripper interface {
RoundTrip(addr string, call *Call) *Call
Go(addr, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
Call(addr, serviceMethod string, args interface{}, reply interface{}) error
CallWithContext(ctx context.Context, addr string, serviceMethod string, args interface{}, reply interface{}) error
Watch(addr, key string) (Watcher, error)
Ping(addr string) error
Close() error
}
RoundTripper is an interface representing the ability to execute a single RPC transaction, obtaining the Response for a given Request.
type Scheduling ¶
type Scheduling int
Scheduling represents the scheduling algorithms.
const ( //RoundRobinScheduling uses the Round Robin algorithm to load balance traffic. RoundRobinScheduling Scheduling = iota //RandomScheduling randomly selects the target server. RandomScheduling //LeastTimeScheduling selects the target server with the lowest latency. LeastTimeScheduling )
type Server ¶
Server represents an RPC Server.
func (*Server) GetLogLevel ¶
GetLogLevel returns log's level.
func (*Server) ListenWithOptions ¶
ListenWithOptions announces on the local network address with Options.
func (*Server) Register ¶
Register publishes in the server the set of methods of the receiver value that satisfy the following conditions:
- exported method of exported type
- two arguments, both of exported type
- the second argument is a pointer
- one return value, of type error
It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error using package log. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.
func (*Server) RegisterName ¶
RegisterName is like Register but uses the provided name for the type instead of the receiver's concrete type.
func (*Server) ServeCodec ¶
func (server *Server) ServeCodec(codec ServerCodec)
ServeCodec uses the specified codec to decode requests and encode responses.
func (*Server) ServeRequest ¶
func (server *Server) ServeRequest(codec ServerCodec, recving *sync.Mutex, sending *sync.Mutex, wg *sync.WaitGroup, worker bool, ch chan *Context, done chan struct{}, workers chan struct{}) error
ServeRequest is like ServeCodec but synchronously serves a single request. It does not close the codec upon completion.
func (*Server) SetBufferSize ¶ added in v0.0.2
SetBufferSize sets buffer size.
func (*Server) SetContextBuffer ¶ added in v0.0.2
SetContextBuffer sets shared buffer.
func (*Server) SetLogLevel ¶
SetLogLevel sets log's level.
func (*Server) SetNoBatch ¶
SetNoBatch disables the Server to use batch writer.
func (*Server) SetNoCopy ¶ added in v0.0.2
SetNoCopy reuses a buffer from the pool for minimizing memory allocations. The RPC handler takes ownership of buffer, and the handler should not use buffer after this handle. The default noCopy is false to make a copy of data for every RPC handler.
func (*Server) SetPipelining ¶
SetPipelining enables the Server to use pipelining.
type ServerCodec ¶
type ServerCodec interface {
ReadRequestHeader(*Context) error
ReadRequestBody([]byte, interface{}) error
WriteResponse(*Context, interface{}) error
Close() error
}
ServerCodec implements reading of RPC requests and writing of RPC responses for the server side of an RPC session. The server calls ReadRequestHeader and ReadRequestBody in pairs to read requests from the connection, and it calls WriteResponse to write a response back. The server calls Close when finished with the connection. ReadRequestBody may be called with a nil argument to force the body of the request to be read and discarded.
func NewServerCodec ¶
func NewServerCodec(bodyCodec Codec, headerEncoder *Encoder, messages socket.Messages, noBatch bool) ServerCodec
NewServerCodec returns a new ServerCodec.
type Transport ¶
type Transport struct {
MaxConnsPerHost int
MaxIdleConnsPerHost int
KeepAlive time.Duration
IdleConnTimeout time.Duration
Network string
Codec string
Dial func(network, address, codec string) (*Conn, error)
Options *Options
DialWithOptions func(address string, opts *Options) (*Conn, error)
// contains filtered or unexported fields
}
Transport defines the struct of transport
func (*Transport) Call ¶
Call invokes the named function, waits for it to complete, and returns its error status.
func (*Transport) CallWithContext ¶ added in v0.0.2
func (t *Transport) CallWithContext(ctx context.Context, addr string, serviceMethod string, args interface{}, reply interface{}) error
CallWithContext acts like Call but takes a context.
func (*Transport) CloseIdleConnections ¶
func (t *Transport) CloseIdleConnections()
CloseIdleConnections closes the idle connections.
func (*Transport) Go ¶
func (t *Transport) Go(addr, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
func (*Transport) Ping ¶
Ping is NOT ICMP ping, this is just used to test whether a connection is still alive.
type Watcher ¶
type Watcher interface {
// Wait will return value when the key is triggered.
Wait() ([]byte, error)
// WaitWithContext acts like Wait but takes a context.
WaitWithContext(context.Context) ([]byte, error)
// Stop stops the watch.
Stop() error
}
Watcher represents a watcher.



