Best Syzkaller code snippet using rpctype.Dial
pool.go
Source:pool.go
...14 "github.com/hashicorp/consul/tlsutil"15 msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"16 "github.com/hashicorp/yamux"17)18const defaultDialTimeout = 10 * time.Second19// muxSession is used to provide an interface for a stream multiplexer.20type muxSession interface {21 Open() (net.Conn, error)22 Close() error23}24// streamClient is used to wrap a stream with an RPC client25type StreamClient struct {26 stream net.Conn27 codec rpc.ClientCodec28}29func (sc *StreamClient) Close() {30 sc.stream.Close()31 sc.codec.Close()32}33// Conn is a pooled connection to a Consul server34type Conn struct {35 refCount int3236 shouldClose int3237 nodeName string38 addr net.Addr39 session muxSession40 lastUsed time.Time41 pool *ConnPool42 clients *list.List43 clientLock sync.Mutex44}45func (c *Conn) Close() error {46 return c.session.Close()47}48// getClient is used to get a cached or new client49func (c *Conn) getClient() (*StreamClient, error) {50 // Check for cached client51 c.clientLock.Lock()52 front := c.clients.Front()53 if front != nil {54 c.clients.Remove(front)55 }56 c.clientLock.Unlock()57 if front != nil {58 return front.Value.(*StreamClient), nil59 }60 // Open a new session61 stream, err := c.session.Open()62 if err != nil {63 return nil, err64 }65 // Create the RPC client66 codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)67 // Return a new stream client68 sc := &StreamClient{69 stream: stream,70 codec: codec,71 }72 return sc, nil73}74// returnStream is used when done with a stream75// to allow re-use by a future RPC76func (c *Conn) returnClient(client *StreamClient) {77 didSave := false78 c.clientLock.Lock()79 if c.clients.Len() < c.pool.MaxStreams && atomic.LoadInt32(&c.shouldClose) == 0 {80 c.clients.PushFront(client)81 didSave = true82 // If this is a Yamux stream, shrink the internal buffers so that83 // we can GC the idle memory84 if ys, ok := client.stream.(*yamux.Stream); ok {85 ys.Shrink()86 }87 }88 c.clientLock.Unlock()89 if !didSave {90 client.Close()91 }92}93// markForUse does all the bookkeeping required to ready a connection for use.94func (c *Conn) markForUse() {95 c.lastUsed = time.Now()96 atomic.AddInt32(&c.refCount, 1)97}98// ConnPool is used to maintain a connection pool to other Consul99// servers. This is used to reduce the latency of RPC requests between100// servers. It is only used to pool connections in the rpcConsul mode.101// Raft connections are pooled separately. Maintain at most one102// connection per host, for up to MaxTime. When MaxTime connection103// reaping is disabled. MaxStreams is used to control the number of idle104// streams allowed. If TLS settings are provided outgoing connections105// use TLS.106type ConnPool struct {107 // SrcAddr is the source address for outgoing connections.108 SrcAddr *net.TCPAddr109 // LogOutput is used to control logging110 LogOutput io.Writer111 // The maximum time to keep a connection open112 MaxTime time.Duration113 // The maximum number of open streams to keep114 MaxStreams int115 // TLSConfigurator116 TLSConfigurator *tlsutil.Configurator117 // GatewayResolver is a function that returns a suitable random mesh118 // gateway address for dialing servers in a given DC. This is only119 // needed if wan federation via mesh gateways is enabled.120 GatewayResolver func(string) string121 // Datacenter is the datacenter of the current agent.122 Datacenter string123 // Server should be set to true if this connection pool is configured in a124 // server instead of a client.125 Server bool126 sync.Mutex127 // pool maps a nodeName+address to a open connection128 pool map[string]*Conn129 // limiter is used to throttle the number of connect attempts130 // to a given address. The first thread will attempt a connection131 // and put a channel in here, which all other threads will wait132 // on to close.133 limiter map[string]chan struct{}134 // Used to indicate the pool is shutdown135 shutdown bool136 shutdownCh chan struct{}137 // once initializes the internal data structures and connection138 // reaping on first use.139 once sync.Once140}141// init configures the initial data structures. It should be called142// by p.once.Do(p.init) in all public methods.143func (p *ConnPool) init() {144 p.pool = make(map[string]*Conn)145 p.limiter = make(map[string]chan struct{})146 p.shutdownCh = make(chan struct{})147 if p.MaxTime > 0 {148 go p.reap()149 }150}151// Shutdown is used to close the connection pool152func (p *ConnPool) Shutdown() error {153 p.once.Do(p.init)154 p.Lock()155 defer p.Unlock()156 for _, conn := range p.pool {157 conn.Close()158 }159 p.pool = make(map[string]*Conn)160 if p.shutdown {161 return nil162 }163 p.shutdown = true164 close(p.shutdownCh)165 return nil166}167// acquire will return a pooled connection, if available. Otherwise it will168// wait for an existing connection attempt to finish, if one if in progress,169// and will return that one if it succeeds. If all else fails, it will return a170// newly-created connection and add it to the pool.171func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr) (*Conn, error) {172 if nodeName == "" {173 return nil, fmt.Errorf("pool: ConnPool.acquire requires a node name")174 }175 addrStr := addr.String()176 poolKey := nodeName + ":" + addrStr177 // Check to see if there's a pooled connection available. This is up178 // here since it should the vastly more common case than the rest179 // of the code here.180 p.Lock()181 c := p.pool[poolKey]182 if c != nil {183 c.markForUse()184 p.Unlock()185 return c, nil186 }187 // If not (while we are still locked), set up the throttling structure188 // for this address, which will make everyone else wait until our189 // attempt is done.190 var wait chan struct{}191 var ok bool192 if wait, ok = p.limiter[addrStr]; !ok {193 wait = make(chan struct{})194 p.limiter[addrStr] = wait195 }196 isLeadThread := !ok197 p.Unlock()198 // If we are the lead thread, make the new connection and then wake199 // everybody else up to see if we got it.200 if isLeadThread {201 c, err := p.getNewConn(dc, nodeName, addr)202 p.Lock()203 delete(p.limiter, addrStr)204 close(wait)205 if err != nil {206 p.Unlock()207 return nil, err208 }209 p.pool[poolKey] = c210 p.Unlock()211 return c, nil212 }213 // Otherwise, wait for the lead thread to attempt the connection214 // and use what's in the pool at that point.215 select {216 case <-p.shutdownCh:217 return nil, fmt.Errorf("rpc error: shutdown")218 case <-wait:219 }220 // See if the lead thread was able to get us a connection.221 p.Lock()222 if c := p.pool[poolKey]; c != nil {223 c.markForUse()224 p.Unlock()225 return c, nil226 }227 p.Unlock()228 return nil, fmt.Errorf("rpc error: lead thread didn't get connection")229}230// HalfCloser is an interface that exposes a TCP half-close without exposing231// the underlying TLS or raw TCP connection.232type HalfCloser interface {233 CloseWrite() error234}235// DialTimeout is used to establish a raw connection to the given server, with236// given connection timeout. It also writes RPCTLS as the first byte.237func (p *ConnPool) DialTimeout(238 dc string,239 nodeName string,240 addr net.Addr,241 actualRPCType RPCType,242) (net.Conn, HalfCloser, error) {243 p.once.Do(p.init)244 if p.Server && p.GatewayResolver != nil && p.TLSConfigurator != nil && dc != p.Datacenter {245 // NOTE: TLS is required on this branch.246 return DialTimeoutWithRPCTypeViaMeshGateway(247 dc,248 nodeName,249 addr,250 p.SrcAddr,251 p.TLSConfigurator.OutgoingALPNRPCWrapper(),252 actualRPCType,253 RPCTLS,254 // gateway stuff255 p.Server,256 p.TLSConfigurator,257 p.GatewayResolver,258 p.Datacenter,259 )260 }261 return p.dial(262 dc,263 nodeName,264 addr,265 actualRPCType,266 RPCTLS,267 )268}269func (p *ConnPool) dial(270 dc string,271 nodeName string,272 addr net.Addr,273 actualRPCType RPCType,274 tlsRPCType RPCType,275) (net.Conn, HalfCloser, error) {276 // Try to dial the conn277 d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: defaultDialTimeout}278 conn, err := d.Dial("tcp", addr.String())279 if err != nil {280 return nil, nil, err281 }282 var hc HalfCloser283 if tcp, ok := conn.(*net.TCPConn); ok {284 tcp.SetKeepAlive(true)285 tcp.SetNoDelay(true)286 // Expose TCPConn CloseWrite method on HalfCloser287 hc = tcp288 }289 // Check if TLS is enabled290 if p.TLSConfigurator.UseTLS(dc) {291 wrapper := p.TLSConfigurator.OutgoingRPCWrapper()292 // Switch the connection into TLS mode293 if _, err := conn.Write([]byte{byte(tlsRPCType)}); err != nil {294 conn.Close()295 return nil, nil, err296 }297 // Wrap the connection in a TLS client298 tlsConn, err := wrapper(dc, conn)299 if err != nil {300 conn.Close()301 return nil, nil, err302 }303 conn = tlsConn304 // If this is a tls.Conn, expose HalfCloser to caller305 if tlsConn, ok := conn.(*tls.Conn); ok {306 hc = tlsConn307 }308 }309 // Send the type-byte for the protocol if one is required.310 //311 // When using insecure TLS there is no inner type-byte as these connections312 // aren't wrapped like the standard TLS ones are.313 if tlsRPCType != RPCTLSInsecure {314 if _, err := conn.Write([]byte{byte(actualRPCType)}); err != nil {315 conn.Close()316 return nil, nil, err317 }318 }319 return conn, hc, nil320}321// DialTimeoutWithRPCTypeViaMeshGateway dials the destination node and sets up322// the connection to be the correct RPC type using ALPN. This currently is323// exclusively used to dial other servers in foreign datacenters via mesh324// gateways.325//326// NOTE: There is a close mirror of this method in agent/consul/wanfed/wanfed.go:dial327func DialTimeoutWithRPCTypeViaMeshGateway(328 dc string,329 nodeName string,330 addr net.Addr,331 src *net.TCPAddr,332 wrapper tlsutil.ALPNWrapper,333 actualRPCType RPCType,334 tlsRPCType RPCType,335 // gateway stuff336 dialingFromServer bool,337 tlsConfigurator *tlsutil.Configurator,338 gatewayResolver func(string) string,339 thisDatacenter string,340) (net.Conn, HalfCloser, error) {341 if !dialingFromServer {342 return nil, nil, fmt.Errorf("must dial via mesh gateways from a server agent")343 } else if gatewayResolver == nil {344 return nil, nil, fmt.Errorf("gatewayResolver is nil")345 } else if tlsConfigurator == nil {346 return nil, nil, fmt.Errorf("tlsConfigurator is nil")347 } else if dc == thisDatacenter {348 return nil, nil, fmt.Errorf("cannot dial servers in the same datacenter via a mesh gateway")349 } else if wrapper == nil {350 return nil, nil, fmt.Errorf("cannot dial via a mesh gateway when outgoing TLS is disabled")351 }352 nextProto := actualRPCType.ALPNString()353 if nextProto == "" {354 return nil, nil, fmt.Errorf("rpc type %d cannot be routed through a mesh gateway", actualRPCType)355 }356 gwAddr := gatewayResolver(dc)357 if gwAddr == "" {358 return nil, nil, structs.ErrDCNotAvailable359 }360 dialer := &net.Dialer{LocalAddr: src, Timeout: defaultDialTimeout}361 rawConn, err := dialer.Dial("tcp", gwAddr)362 if err != nil {363 return nil, nil, err364 }365 if tcp, ok := rawConn.(*net.TCPConn); ok {366 _ = tcp.SetKeepAlive(true)367 _ = tcp.SetNoDelay(true)368 }369 // NOTE: now we wrap the connection in a TLS client.370 tlsConn, err := wrapper(dc, nodeName, nextProto, rawConn)371 if err != nil {372 return nil, nil, err373 }374 var conn net.Conn = tlsConn375 var hc HalfCloser376 if tlsConn, ok := conn.(*tls.Conn); ok {377 // Expose *tls.Conn CloseWrite method on HalfCloser378 hc = tlsConn379 }380 return conn, hc, nil381}382// getNewConn is used to return a new connection383func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr) (*Conn, error) {384 if nodeName == "" {385 return nil, fmt.Errorf("pool: ConnPool.getNewConn requires a node name")386 }387 // Get a new, raw connection and write the Consul multiplex byte to set the mode388 conn, _, err := p.DialTimeout(dc, nodeName, addr, RPCMultiplexV2)389 if err != nil {390 return nil, err391 }392 // Setup the logger393 conf := yamux.DefaultConfig()394 conf.LogOutput = p.LogOutput395 // Create a multiplexed session396 session, _ := yamux.Client(conn, conf)397 // Wrap the connection398 c := &Conn{399 refCount: 1,400 nodeName: nodeName,401 addr: addr,402 session: session,...
rpc.go
Source:rpc.go
...14)15// Max size of a message before we treat the size as invalid16const (17 MaxMessageSize = 1024 * 1024 * 102418 leaderDialTimeout = 10 * time.Second19)20// rpc handles request/response style messaging between cluster nodes21type rpc struct {22 logger *log.Logger23 tracingEnabled bool24 store interface {25 cachedData() *Data26 enableLocalRaft() error27 IsLeader() bool28 Leader() string29 Peers() ([]string, error)30 SetPeers(addrs []string) error31 AddPeer(host string) error32 CreateNode(host string) (*NodeInfo, error)33 NodeByHost(host string) (*NodeInfo, error)34 WaitForDataChanged() error35 }36}37// JoinResult defines the join result structure.38type JoinResult struct {39 RaftEnabled bool40 RaftNodes []string41 NodeID uint6442}43// Reply defines the interface for Reply objects.44type Reply interface {45 GetHeader() *internal.ResponseHeader46}47// proxyLeader proxies the connection to the current raft leader48func (r *rpc) proxyLeader(conn *net.TCPConn, buf []byte) {49 if r.store.Leader() == "" {50 r.sendError(conn, "no leader detected during proxyLeader")51 return52 }53 leaderConn, err := net.DialTimeout("tcp", r.store.Leader(), leaderDialTimeout)54 if err != nil {55 r.sendError(conn, fmt.Sprintf("dial leader: %v", err))56 return57 }58 defer leaderConn.Close()59 leaderConn.Write([]byte{MuxRPCHeader})60 // re-write the original message to the leader61 leaderConn.Write(buf)62 if err := proxy(leaderConn.(*net.TCPConn), conn); err != nil {63 r.sendError(conn, fmt.Sprintf("leader proxy error: %v", err))64 }65}66// handleRPCConn reads a command from the connection and executes it.67func (r *rpc) handleRPCConn(conn net.Conn) {68 defer conn.Close()69 // RPC connections should execute on the leader. If we are not the leader,70 // proxy the connection to the leader so that clients an connect to any node71 // in the cluster.72 r.traceCluster("rpc connection from: %v", conn.RemoteAddr())73 // Read and execute request.74 typ, buf, err := r.readMessage(conn)75 // Handle unexpected RPC errors76 if err != nil {77 r.sendError(conn, err.Error())78 return79 }80 if !r.store.IsLeader() && typ != internal.RPCType_PromoteRaft {81 r.proxyLeader(conn.(*net.TCPConn), pack(typ, buf))82 return83 }84 typ, resp, err := r.executeMessage(conn, typ, buf)85 // Handle unexpected RPC errors86 if err != nil {87 r.sendError(conn, err.Error())88 return89 }90 // Set the status header and error message91 if reply, ok := resp.(Reply); ok {92 reply.GetHeader().OK = proto.Bool(err == nil)93 if err != nil {94 reply.GetHeader().Error = proto.String(err.Error())95 }96 }97 r.sendResponse(conn, typ, resp)98}99func (r *rpc) readMessage(conn net.Conn) (internal.RPCType, []byte, error) {100 // Read request size.101 var sz uint64102 if err := binary.Read(conn, binary.BigEndian, &sz); err != nil {103 return internal.RPCType_Error, nil, fmt.Errorf("read size: %s", err)104 }105 if sz == 0 {106 return internal.RPCType_Error, nil, fmt.Errorf("invalid message size: %d", sz)107 }108 if sz >= MaxMessageSize {109 return internal.RPCType_Error, nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz)110 }111 // Read request.112 buf := make([]byte, sz)113 if _, err := io.ReadFull(conn, buf); err != nil {114 return internal.RPCType_Error, nil, fmt.Errorf("read request: %s", err)115 }116 // Determine the RPC type117 rpcType := internal.RPCType(btou64(buf[0:8]))118 buf = buf[8:]119 r.traceCluster("recv %v request on: %v", rpcType, conn.RemoteAddr())120 return rpcType, buf, nil121}122func (r *rpc) executeMessage(conn net.Conn, rpcType internal.RPCType, buf []byte) (internal.RPCType, proto.Message, error) {123 switch rpcType {124 case internal.RPCType_FetchData:125 var req internal.FetchDataRequest126 if err := proto.Unmarshal(buf, &req); err != nil {127 return internal.RPCType_Error, nil, fmt.Errorf("fetch request unmarshal: %v", err)128 }129 resp, err := r.handleFetchData(&req)130 return rpcType, resp, err131 case internal.RPCType_Join:132 var req internal.JoinRequest133 if err := proto.Unmarshal(buf, &req); err != nil {134 return internal.RPCType_Error, nil, fmt.Errorf("join request unmarshal: %v", err)135 }136 resp, err := r.handleJoinRequest(&req)137 return rpcType, resp, err138 case internal.RPCType_PromoteRaft:139 var req internal.PromoteRaftRequest140 if err := proto.Unmarshal(buf, &req); err != nil {141 return internal.RPCType_Error, nil, fmt.Errorf("promote to raft request unmarshal: %v", err)142 }143 resp, err := r.handlePromoteRaftRequest(&req)144 return rpcType, resp, err145 default:146 return internal.RPCType_Error, nil, fmt.Errorf("unknown rpc type:%v", rpcType)147 }148}149func (r *rpc) sendResponse(conn net.Conn, typ internal.RPCType, resp proto.Message) {150 // Marshal the response back to a protobuf151 buf, err := proto.Marshal(resp)152 if err != nil {153 r.logger.Printf("unable to marshal response: %v", err)154 return155 }156 // Encode response back to connection.157 if _, err := conn.Write(pack(typ, buf)); err != nil {158 r.logger.Printf("unable to write rpc response: %s", err)159 }160}161func (r *rpc) sendError(conn net.Conn, msg string) {162 r.traceCluster(msg)163 resp := &internal.ErrorResponse{164 Header: &internal.ResponseHeader{165 OK: proto.Bool(false),166 Error: proto.String(msg),167 },168 }169 r.sendResponse(conn, internal.RPCType_Error, resp)170}171// handleFetchData handles a request for the current nodes meta data172func (r *rpc) handleFetchData(req *internal.FetchDataRequest) (*internal.FetchDataResponse, error) {173 var (174 b []byte175 data *Data176 err error177 )178 for {179 data = r.store.cachedData()180 if data.Index != req.GetIndex() {181 b, err = data.MarshalBinary()182 if err != nil {183 return nil, err184 }185 break186 }187 if !req.GetBlocking() {188 break189 }190 if err := r.store.WaitForDataChanged(); err != nil {191 return nil, err192 }193 }194 return &internal.FetchDataResponse{195 Header: &internal.ResponseHeader{196 OK: proto.Bool(true),197 },198 Index: proto.Uint64(data.Index),199 Term: proto.Uint64(data.Term),200 Data: b}, nil201}202// handleJoinRequest handles a request to join the cluster203func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinResponse, error) {204 r.traceCluster("join request from: %v", *req.Addr)205 node, err := func() (*NodeInfo, error) {206 // attempt to create the node207 node, err := r.store.CreateNode(*req.Addr)208 // if it exists, return the existing node209 if err == ErrNodeExists {210 node, err = r.store.NodeByHost(*req.Addr)211 if err != nil {212 return node, err213 }214 r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host)215 } else if err != nil {216 return nil, fmt.Errorf("create node: %v", err)217 }218 peers, err := r.store.Peers()219 if err != nil {220 return nil, fmt.Errorf("list peers: %v", err)221 }222 // If we have less than 3 nodes, add them as raft peers if they are not223 // already a peer224 if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) {225 r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr)226 if err = r.store.AddPeer(*req.Addr); err != nil {227 return node, fmt.Errorf("add peer: %v", err)228 }229 }230 return node, err231 }()232 nodeID := uint64(0)233 if node != nil {234 nodeID = node.ID235 }236 if err != nil {237 return nil, err238 }239 // get the current raft peers240 peers, err := r.store.Peers()241 if err != nil {242 return nil, fmt.Errorf("list peers: %v", err)243 }244 return &internal.JoinResponse{245 Header: &internal.ResponseHeader{246 OK: proto.Bool(true),247 },248 EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)),249 RaftNodes: peers,250 NodeID: proto.Uint64(nodeID),251 }, err252}253func (r *rpc) handlePromoteRaftRequest(req *internal.PromoteRaftRequest) (*internal.PromoteRaftResponse, error) {254 r.traceCluster("promote raft request from: %v", *req.Addr)255 // Need to set the local store peers to match what we are about to join256 if err := r.store.SetPeers(req.RaftNodes); err != nil {257 return nil, err258 }259 if err := r.store.enableLocalRaft(); err != nil {260 return nil, err261 }262 if !contains(req.RaftNodes, *req.Addr) {263 req.RaftNodes = append(req.RaftNodes, *req.Addr)264 }265 if err := r.store.SetPeers(req.RaftNodes); err != nil {266 return nil, err267 }268 return &internal.PromoteRaftResponse{269 Header: &internal.ResponseHeader{270 OK: proto.Bool(true),271 },272 Success: proto.Bool(true),273 }, nil274}275// pack returns a TLV style byte slice encoding the size of the payload, the RPC type276// and the RPC data277func pack(typ internal.RPCType, b []byte) []byte {278 buf := u64tob(uint64(len(b)) + 8)279 buf = append(buf, u64tob(uint64(typ))...)280 buf = append(buf, b...)281 return buf282}283// fetchMetaData returns the latest copy of the meta store data from the current284// leader.285func (r *rpc) fetchMetaData(blocking bool) (*Data, error) {286 assert(r.store != nil, "store is nil")287 // Retrieve the current known leader.288 leader := r.store.Leader()289 if leader == "" {290 return nil, errors.New("no leader detected during fetchMetaData")291 }292 var index, term uint64293 data := r.store.cachedData()294 if data != nil {295 index = data.Index296 term = data.Index297 }298 resp, err := r.call(leader, &internal.FetchDataRequest{299 Index: proto.Uint64(index),300 Term: proto.Uint64(term),301 Blocking: proto.Bool(blocking),302 })303 if err != nil {304 return nil, err305 }306 switch t := resp.(type) {307 case *internal.FetchDataResponse:308 // If data is nil, then the term and index we sent matches the leader309 if t.GetData() == nil {310 return nil, nil311 }312 ms := &Data{}313 if err := ms.UnmarshalBinary(t.GetData()); err != nil {314 return nil, fmt.Errorf("rpc unmarshal metadata: %v", err)315 }316 return ms, nil317 case *internal.ErrorResponse:318 return nil, fmt.Errorf("rpc failed: %s", t.GetHeader().GetError())319 default:320 return nil, fmt.Errorf("rpc failed: unknown response type: %v", t.String())321 }322}323// join attempts to join a cluster at remoteAddr using localAddr as the current324// node's cluster address325func (r *rpc) join(localAddr, remoteAddr string) (*JoinResult, error) {326 req := &internal.JoinRequest{327 Addr: proto.String(localAddr),328 }329 resp, err := r.call(remoteAddr, req)330 if err != nil {331 return nil, err332 }333 switch t := resp.(type) {334 case *internal.JoinResponse:335 return &JoinResult{336 RaftEnabled: t.GetEnableRaft(),337 RaftNodes: t.GetRaftNodes(),338 NodeID: t.GetNodeID(),339 }, nil340 case *internal.ErrorResponse:341 return nil, fmt.Errorf("rpc failed: %s", t.GetHeader().GetError())342 default:343 return nil, fmt.Errorf("rpc failed: unknown response type: %v", t.String())344 }345}346// enableRaft attempts to promote a node at remoteAddr using localAddr as the current347// node's cluster address348func (r *rpc) enableRaft(addr string, peers []string) error {349 req := &internal.PromoteRaftRequest{350 Addr: proto.String(addr),351 RaftNodes: peers,352 }353 resp, err := r.call(addr, req)354 if err != nil {355 return err356 }357 switch t := resp.(type) {358 case *internal.PromoteRaftResponse:359 return nil360 case *internal.ErrorResponse:361 return fmt.Errorf("rpc failed: %s", t.GetHeader().GetError())362 default:363 return fmt.Errorf("rpc failed: unknown response type: %v", t.String())364 }365}366// call sends an encoded request to the remote leader and returns367// an encoded response value.368func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {369 // Determine type of request370 var rpcType internal.RPCType371 switch t := req.(type) {372 case *internal.JoinRequest:373 rpcType = internal.RPCType_Join374 case *internal.FetchDataRequest:375 rpcType = internal.RPCType_FetchData376 case *internal.PromoteRaftRequest:377 rpcType = internal.RPCType_PromoteRaft378 default:379 return nil, fmt.Errorf("unknown rpc request type: %v", t)380 }381 // Create a connection to the leader.382 conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)383 if err != nil {384 return nil, fmt.Errorf("rpc dial: %v", err)385 }386 defer conn.Close()387 // Write a marker byte for rpc messages.388 _, err = conn.Write([]byte{MuxRPCHeader})389 if err != nil {390 return nil, err391 }392 b, err := proto.Marshal(req)393 if err != nil {394 return nil, fmt.Errorf("rpc marshal: %v", err)395 }396 // Write request size & bytes....
Dial
Using AI Code Generation
1import (2type Args struct {3}4type Quotient struct {5}6func main() {7 client, err := rpc.DialHTTP("tcp", "localhost:1234")8 if err != nil {9 log.Fatal("dialing:", err)10 }11 args := Args{17, 8}12 err = client.Call("Arith.Multiply", args, &reply)13 if err != nil {14 log.Fatal("arith error:", err)15 }16 fmt.Printf("Arith: %d*%d=%d17 err = client.Call("Arith.Divide", args, &quo)18 if err != nil {19 log.Fatal("arith error:", err)20 }21 fmt.Printf("Arith: %d/%d=%d remainder %d22}23import (24type Args struct {25}26type Quotient struct {27}28func main() {29 client, err := rpc.DialHTTP("tcp", "localhost:1234")30 if err != nil {31 log.Fatal("dialing:", err)32 }33 args := Args{17, 8}34 quotient := new(Quotient)35 divCall := client.Go("Arith.Divide", args, quotient, nil)36 fmt.Printf("Arith: %d/%d=%d remainder %d37}
Dial
Using AI Code Generation
1import (2func main() {3 client, err := rpc.Dial("tcp", "localhost:1234")4 if err != nil {5 fmt.Println(err)6 }7 err = client.Call("HelloService.Hello", "World", &reply)8 if err != nil {9 fmt.Println(err)10 }11 fmt.Println(reply)12}13import (14func main() {15 client, err := rpc.DialHTTP("tcp", "localhost:1234")16 if err != nil {17 fmt.Println(err)18 }19 err = client.Call("HelloService.Hello", "World", &reply)20 if err != nil {21 fmt.Println(err)22 }23 fmt.Println(reply)24}25import (26func main() {27 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/_rpc_")28 if err != nil {29 fmt.Println(err)30 }31 err = client.Call("HelloService.Hello", "World", &reply)32 if err != nil {33 fmt.Println(err)34 }35 fmt.Println(reply)36}37import (38func main() {39 client, err := rpc.DialHTTPConnect("tcp", "localhost:1234")40 if err != nil {41 fmt.Println(err)42 }43 err = client.Call("HelloService.Hello", "World", &reply)44 if err != nil {45 fmt.Println(err)46 }47 fmt.Println(reply)48}49import (50func main() {51 client, err := rpc.DialHTTPConnect("tcp", "localhost:1234")52 if err != nil {53 fmt.Println(err)54 }55 err = client.Call("HelloService.Hello", "World", &reply
Dial
Using AI Code Generation
1import (2func main() {3 client, err := rpc.DialHTTP("tcp", "localhost:1234")4 if err != nil {5 fmt.Println(err)6 }7 err = client.Call("HelloService.Hello", "World", &reply)8 if err != nil {9 fmt.Println(err)10 }11 fmt.Println(reply)12}13import (14func main() {15 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/rpc")16 if err != nil {17 fmt.Println(err)18 }19 err = client.Call("HelloService.Hello", "World", &reply)20 if err != nil {21 fmt.Println(err)22 }23 fmt.Println(reply)24}25import (26func main() {27 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/rpc")28 if err != nil {29 fmt.Println(err)30 }31 err = client.Call("HelloService.Hello", "World", &reply)32 if err != nil {33 fmt.Println(err)34 }35 fmt.Println(reply)36}37import (38func main() {39 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/rpc")40 if err != nil {41 fmt.Println(err)42 }43 err = client.Call("HelloService.Hello", "World", &reply)44 if err != nil {45 fmt.Println(err)46 }47 fmt.Println(reply)48}49import (50func main() {51 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/rpc")52 if err != nil {53 fmt.Println(err)54 }55 err = client.Call("HelloService.Hello", "World", &reply)
Dial
Using AI Code Generation
1import (2func main() {3 client, err := rpc.Dial("tcp", "localhost:1234")4 if err != nil {5 log.Fatal("dialing:", err)6 }7 args := &Args{7, 8}8 err = client.Call("Arith.Multiply", args, &reply)9 if err != nil {10 log.Fatal("arith error:", err)11 }12 fmt.Printf("Arith: %d*%d=%d13 quot := new(Quotient)14 divCall := client.Go("Arith.Divide", args, quot, nil)15}16import (17func main() {18 server := rpc.NewServer()19 arith := new(Arith)20 server.Register(arith)21 l, e := net.Listen("tcp", ":1234")22 if e != nil {23 log.Fatal("listen error:", e)24 }25 for {26 conn, err := l.Accept()27 if err != nil {28 }29 go server.ServeConn(conn)30 }31}32import (33func main() {34 server := rpc.NewServer()35 arith := new(Arith)36 server.Register(arith)37 l, e := net.Listen("tcp", ":1234")38 if e != nil {39 log.Fatal("listen error:", e)40 }41 for {42 conn, err := l.Accept()43 if err != nil {44 }45 go server.ServeConn(conn)46 }47}
Dial
Using AI Code Generation
1import (2type Args struct{3}4type Quotient struct{5}6func main() {7 client, err := rpc.DialHTTP("tcp", "localhost:1234")8 if err != nil {9 fmt.Println(err)10 }11 args := Args{17, 8}12 err = client.Call("Arith.Multiply", args, &reply)13 if err != nil {14 fmt.Println(err)15 }16 fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)17 err = client.Call("Arith.Divide", args, ")18 if err != nil {19 fmt.Println(err)20 }21 fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)22}23import (24type Args struct{25}26type Quotient struct{27}28func main() {29 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/rpc")30 if err != nil {31 fmt.Println(err)32 }33 args := Args{17, 8}34 err = client.Call("Arith.Multiply", args, &reply)35 if err != nil {36 fmt.Println(err)37 }38 fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)39 err = client.Call("Arith.Divide", args, ")40 if err != nil {41 fmt.Println(err)42 }43 fmt.Printf("Arith: %d/%d=%d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)44}45import (46type Args struct{47}48type Quotient struct{49}50func main() {51 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/rpc")
Dial
Using AI Code Generation
1import (2func main() {3 client, err := rpc.Dial("tcp", "localhost:1234")4 if err != nil {5 fmt.Println(err)6 }7 defer client.Close()8 err = client.Call("HelloService.Hello", "world", &reply)9 if err != nil {10 fmt.Println(err)11 }12 fmt.Println(reply)13}14import (15func main() {16 client, err := rpc.DialHTTP("tcp", "localhost:1234")17 if err != nil {18 fmt.Println(err)19 }20 defer client.Close()21 err = client.Call("HelloService.Hello", "world", &reply)22 if err != nil {23 fmt.Println(err)24 }25 fmt.Println(reply)26}27import (28func main() {29 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/hello")30 if err != nil {31 fmt.Println(err)32 }33 defer client.Close()34 err = client.Call("HelloService.Hello", "world", &reply)35 if err != nil {36 fmt.Println(err)37 }38 fmt.Println(reply)39}40import (41func main() {42 client, err := rpc.DialHTTPPath("tcp", "localhost:1234", "/hello")43 if err != nil {44 fmt.Println(err)45 }46 defer client.Close()
Dial
Using AI Code Generation
1import (2type Args struct {3}4type Quotient struct {5}6func main() {7 client, err := rpc.DialHTTP("tcp", "
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!