How to use put method of conn Package

Best Gauge code snippet using conn.put

pubconnection.go

Source:pubconnection.go Github

copy

Full Screen

...16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19// THE SOFTWARE.20package inputhost21import (22	"sync"23	"sync/atomic"24	"time"25	"github.com/uber-common/bark"26	"github.com/uber/cherami-server/common"27	"github.com/uber/cherami-server/common/metrics"28	"github.com/uber/cherami-server/common/throttler"29	"github.com/uber/cherami-server/services/inputhost/load"30	serverStream "github.com/uber/cherami-server/stream"31	"github.com/uber/cherami-thrift/.generated/go/cherami"32)33// logHighLatencyThreshold is the time threshold for logging34const logHighLatencyThreshold = time.Second35var logThrottler = throttler.New(1, 5*time.Second) // log only once every 5 seconds at max36var logsThrottled int3237type (38	pubConnection struct {39		connID              connectionID40		destinationPath     string41		stream              serverStream.BInOpenPublisherStreamInCall42		logger              bark.Logger43		reconfigureClientCh chan *reconfigInfo44		putMsgCh            chan *inPutMessage45		cacheTimeout        time.Duration46		ackChannel          chan *cherami.PutMessageAck47		replyCh             chan response48		closeChannel        chan struct{} // this is the channel which is used to actually close the stream49		waitWG              sync.WaitGroup50		notifyCloseCh       chan connectionID // this is used to notify the path cache to remove us from its list51		doneCh              chan bool         // this is used to unblock the OpenPublisherStream()52		recvMsgs      int64 // total msgs received53		sentAcks      int64 // total acks sent out54		sentNacks     int64 // total n-acks sent out55		sentThrottled int64 // total msgs that were throttled56		failedMsgs    int64 // total inflight msgs that were 'failed'57		connTokenBucketValue   atomic.Value // Value to controll access for TB for rate limit Num of Msgs received per sec58		connMsgsLimitPerSecond int32        //per second rate limit for this connection59		lk                     sync.Mutex60		opened                 bool61		closed                 bool62		limitsEnabled          bool63		pathCache              *inPathCache64		pathWG                 *sync.WaitGroup65	}66	response struct {67		ackID       string            // this is unique identifier of message68		userContext map[string]string // this is user specified context to pass through69		putMsgRecvTime time.Time // this is the msg receive time, used for latency metrics70	}71	// inPutMessage is the wrapper struct which holds the actual message and72	// the channel to get the reply73	// XXX: Note any changes to this struct should be made on the74	// PutMessageBatch() API in services/inputhost/inpthost.go75	inPutMessage struct {76		putMsg         *cherami.PutMessage77		putMsgAckCh    chan *cherami.PutMessageAck78		putMsgRecvTime time.Time79	}80	earlyReplyAck struct {81		// time when we receive ack from replica82		ackReceiveTime time.Time83		// time when we send ack back to stream84		ackSentTime time.Time85	}86	reconfigInfo struct {87		updateUUID string88		cmdType    cherami.InputHostCommandType89		drainWG    *sync.WaitGroup90	}91	pubConnectionClosedCb func(connectionID)92)93// failTimeout is the timeout to wait for acks from the store when a94// stream is closed95// if we don't get any acks back fail the messages96const failTimeout = 3 * time.Second97// reconfigClientChSize is the size of the reconfigClientCh98const reconfigClientChSize = 5099// perConnMsgsLimitPerSecond is the rate limit per connection100const perConnMsgsLimitPerSecond = 10000101func newPubConnection(destinationPath string, stream serverStream.BInOpenPublisherStreamInCall, pathCache *inPathCache, m3Client metrics.Client, limitsEnabled bool, timeout time.Duration, doneCh chan bool) *pubConnection {102	conn := &pubConnection{103		connID:          pathCache.currID,104		destinationPath: destinationPath,105		logger: pathCache.logger.WithFields(bark.Fields{106			common.TagInPubConnID: common.FmtInPubConnID(int(pathCache.currID)),107			common.TagModule:      `pubConn`,108		}),109		stream:       stream,110		putMsgCh:     pathCache.putMsgCh,111		cacheTimeout: timeout,112		//perConnTokenBucket:  common.NewTokenBucket(perConnMsgsLimitPerSecond, common.NewRealTimeSource()),113		replyCh:             make(chan response, defaultBufferSize),114		reconfigureClientCh: make(chan *reconfigInfo, reconfigClientChSize),115		ackChannel:          make(chan *cherami.PutMessageAck, defaultBufferSize),116		closeChannel:        make(chan struct{}),117		notifyCloseCh:       pathCache.notifyConnsCloseCh,118		doneCh:              doneCh,119		limitsEnabled:       limitsEnabled,120		pathCache:           pathCache,121		pathWG:              &pathCache.connsWG,122	}123	conn.SetMsgsLimitPerSecond(common.HostPerConnMsgsLimitPerSecond)124	return conn125}126func (conn *pubConnection) open() {127	conn.lk.Lock()128	defer conn.lk.Unlock()129	if !conn.opened {130		conn.waitWG.Add(2)131		conn.pathWG.Add(1) // this makes the manage routine in the pathCache is alive132		go conn.readRequestStream()133		go conn.writeAcksStream()134		conn.opened = true135		conn.logger.Info("pubConn opened")136	}137}138func (conn *pubConnection) close() {139	conn.lk.Lock()140	if conn.closed {141		conn.lk.Unlock()142		return143	}144	close(conn.closeChannel)145	conn.closed = true146	conn.waitWG.Wait()147	// we have successfully closed the connection148	// make sure we update the ones who are waiting for us149	select {150	case conn.doneCh <- true:151	default:152	}153	conn.lk.Unlock()154	// notify the patch cache to remove this conn155	// from the cache. No need to hold the lock156	// for this.157	conn.notifyCloseCh <- conn.connID158	// set the wait group for the pathCache to be done159	conn.pathWG.Done()160	conn.logger.WithFields(bark.Fields{161		`sentAcks`:      conn.sentAcks,162		`sentNacks`:     conn.sentNacks,163		`sentThrottled`: conn.sentThrottled,164		`failedMsgs`:    conn.failedMsgs,165	}).Info("pubConn closed")166}167// readRequestStream is the pump which reads messages from the client stream.168// this sends the message to the next layer (extHost) and then also prepares169// the writeAcksStream by sending a message to the intermediary "replyCh"170func (conn *pubConnection) readRequestStream() {171	defer conn.waitWG.Done()172	var msgLength int64173	// Setup the connIdleTimer174	connIdleTimer := common.NewTimer(conn.cacheTimeout)175	defer connIdleTimer.Stop()176	for {177		connIdleTimer.Reset(conn.cacheTimeout)178		select {179		case <-connIdleTimer.C:180			conn.logger.WithField(`cacheTimeout`, conn.cacheTimeout).Info(`client connection idle for : ; closing it`)181			go conn.close()182			return183		default:184			msg, err := conn.stream.Read()185			if err != nil {186				conn.logger.WithField(common.TagErr, err).Info(`inputhost: PublisherStream closed on read error`)187				// conn.close() can block waiting for all routines to188				// go away. make sure we don't deadlock189				go conn.close()190				return191			}192			// record the counter metric193			conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageReceived)194			conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageReceived)195			conn.pathCache.dstMetrics.Increment(load.DstMetricOverallNumMsgs)196			// Note: we increment the destination bytes in counter here because we could throttle this message197			// even before it reaches any of the extents (which increments the extent specific bytes in counter)198			msgLength = int64(len(msg.Data))199			conn.pathCache.dstMetrics.Add(load.DstMetricBytesIn, msgLength)200			conn.pathCache.hostMetrics.Add(load.HostMetricBytesIn, msgLength)201			conn.pathCache.destM3Client.AddCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageReceivedBytes, msgLength)202			conn.recvMsgs++203			inMsg := &inPutMessage{204				putMsg:         msg,205				putMsgAckCh:    conn.ackChannel,206				putMsgRecvTime: time.Now(),207			}208			throttled := false209			if conn.limitsEnabled {210				consumed, _ := conn.GetConnTokenBucketValue().TryConsume(1)211				throttled = !consumed212				if throttled {213					// just send a THROTTLED status back to the client214					conn.logger.Warn("throttling due to rate violation")215					conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageLimitThrottled)216					conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageLimitThrottled)217					inMsg.putMsgAckCh <- &cherami.PutMessageAck{218						ID:          common.StringPtr(msg.GetID()),219						UserContext: msg.GetUserContext(),220						Status:      common.CheramiStatusPtr(cherami.Status_THROTTLED),221						Message:     common.StringPtr("throttling; inputhost is busy"),222					}223				}224			}225			if !throttled {226				if conn.limitsEnabled {227					// if sending to this channel is blocked we need to return a throttle error to the client228					select {229					case conn.putMsgCh <- inMsg:230						// populate the inflight map231						conn.replyCh <- response{msg.GetID(), msg.GetUserContext(), inMsg.putMsgRecvTime}232					default:233						conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageChannelFullThrottled)234						conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageChannelFullThrottled)235						// just send a THROTTLED status back to the client236						conn.logger.Warn("throttling due to putMsgCh being filled")237						inMsg.putMsgAckCh <- &cherami.PutMessageAck{238							ID:          common.StringPtr(msg.GetID()),239							UserContext: msg.GetUserContext(),240							Status:      common.CheramiStatusPtr(cherami.Status_THROTTLED),241							Message:     common.StringPtr("throttling; inputhost is busy"),242						}243					}244				} else {245					select {246					case conn.putMsgCh <- inMsg:247						conn.replyCh <- response{msg.GetID(), msg.GetUserContext(), inMsg.putMsgRecvTime}248					case <-conn.closeChannel:249						// we are shutting down here. just return250						return251					}252				}253			}254		}255	}256}257// writeAcksStream is the pump which sends acks back to the client.258// we read from the intermediary "replyCh", and populate the259// "inflightMessages" map. this is done to serve 2 purposes.260// 1. read from ackChannel only of necessary261// 2. make sure we respond failure back to the client in case something262//    happens and we close the streams underneath263func (conn *pubConnection) writeAcksStream() {264	earlyReplyAcks := make(map[string]earlyReplyAck) // map of all the early acks265	inflightMessages := make(map[string]response)266	defer conn.failInflightMessages(inflightMessages, earlyReplyAcks)267	flushTicker := time.NewTicker(common.FlushTimeout) // start ticker to flush tchannel stream268	defer flushTicker.Stop()269	unflushedWrites := 0270	for {271		select {272		case resCh := <-conn.replyCh:273			// First check if we have already seen the ack for this ID274			if _, ok := earlyReplyAcks[resCh.ackID]; ok {275				// We already received the ack for this msgID.  Complete the request immediately.276				conn.updateEarlyReplyAcks(resCh, earlyReplyAcks)277			} else {278				inflightMessages[resCh.ackID] = resCh279			}280		case <-flushTicker.C:281			if unflushedWrites > 0 {282				if err := conn.flushCmdToClient(unflushedWrites); err != nil {283					// since flush failed, trigger a close of the connection which will fail inflight messages284					go conn.close()285				}286				unflushedWrites = 0287			}288		default:289			if len(inflightMessages) == 0 {290				select {291				case resCh := <-conn.replyCh:292					// First check if we have already seen the ack for this ID293					if _, ok := earlyReplyAcks[resCh.ackID]; ok {294						// We already received the ack for this msgID.  Complete the request immediately.295						conn.updateEarlyReplyAcks(resCh, earlyReplyAcks)296					} else {297						inflightMessages[resCh.ackID] = resCh298					}299				case rInfo := <-conn.reconfigureClientCh:300					// record the counter metric301					conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostReconfClientRequests)302					cmd := createReconfigureCmd(rInfo)303					if err := conn.writeCmdToClient(cmd, rInfo.drainWG); err != nil {304						// trigger a close of the connection305						go conn.close()306						return307					}308					unflushedWrites++309					// we will flush this in our next interval. Since this is just a reconfig310				case <-flushTicker.C:311					if unflushedWrites > 0 {312						if err := conn.flushCmdToClient(unflushedWrites); err != nil {313							// since flush failed, trigger a close of the connection which will fail inflight messages314							go conn.close()315							return316						}317						unflushedWrites = 0318					}319				case <-conn.closeChannel:320					return321				}322			} else {323				select {324				case ack, ok := <-conn.ackChannel:325					if ok {326						ackReceiveTime := time.Now()327						exists, err := conn.writeAckToClient(inflightMessages, ack, ackReceiveTime)328						if err != nil {329							// trigger a close of the connection330							go conn.close()331							return332						}333						if !exists {334							// we received an ack even before we populated the inflight map335							// put it in the earlyReplyAcks and remove it when we get the inflight map336							// XXX: log disabled to reduce spew337							// conn.logger.338							//	WithField(common.TagInPutAckID, common.FmtInPutAckID(ack.GetID())).339							//	Debug("received an ack even before we populated the inflight map")340							earlyReplyAcks[ack.GetID()] = earlyReplyAck{341								ackReceiveTime: ackReceiveTime,342								ackSentTime:    time.Now(),343							}344						}345						unflushedWrites++346						if unflushedWrites > common.FlushThreshold {347							if err = conn.flushCmdToClient(unflushedWrites); err != nil {348								// since flush failed, trigger a close of the connection which will fail inflight messages349								go conn.close()350								return351							}352							unflushedWrites = 0353						}354					} else {355						return356					}357				case rInfo := <-conn.reconfigureClientCh:358					// record the counter metric359					conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostReconfClientRequests)360					cmd := createReconfigureCmd(rInfo)361					if err := conn.writeCmdToClient(cmd, rInfo.drainWG); err != nil {362						// trigger a close of the connection363						go conn.close()364						return365					}366					unflushedWrites++367					// we will flush this in our next interval. Since this is just a reconfig368				case <-flushTicker.C:369					if unflushedWrites > 0 {370						if err := conn.flushCmdToClient(unflushedWrites); err != nil {371							// since flush failed, trigger a close of the connection which will fail inflight messages372							go conn.close()373							return374						}375						unflushedWrites = 0376					}377				case <-conn.closeChannel:378					return379				}380			}381		}382	}383}384func (conn *pubConnection) failInflightMessages(inflightMessages map[string]response, earlyReplyAcks map[string]earlyReplyAck) {385	defer conn.stream.Done()386	failTimer := common.NewTimer(failTimeout)387	defer failTimer.Stop()388	// make sure we wait for all the messages for some timeout period and fail only if necessary389	// we only iterate through the inflightMessages map because the earlyAcksMap is390	// only there for updating metrics properly and since we are failing here, we don't care.391	for quit := false; !quit && len(inflightMessages) > 0; {392		select {393		case ack, ok := <-conn.ackChannel:394			if ok {395				conn.writeAckToClient(inflightMessages, ack, time.Now())396				// ignore error above since we are anyway failing397				// Since we are anyway failing here, we don't care about the398				// early acks map since the only point for that map is to399				// update metric400			}401		case resCh := <-conn.replyCh:402			// First check if we have already seen the ack for this ID403			// We do the check here to make sure we don't incorrectly populate404			// the inflight messages map and timeout those messages down below405			// after out failTimeout elapses.406			// This situation can happen, if we just sent an ack above in the normal407			// path and have not yet populated the infight map and closed the connection408			if _, ok := earlyReplyAcks[resCh.ackID]; ok {409				// We already received the ack for this msgID.  Complete the request immediately.410				conn.updateEarlyReplyAcks(resCh, earlyReplyAcks)411			} else {412				inflightMessages[resCh.ackID] = resCh413			}414		case <-failTimer.C:415			conn.logger.WithField(`inflightMessages`, len(inflightMessages)).Info(`inputhost: timing out messages`)416			quit = true417		}418	}419	// send a failure to all the remaining inflight messages420	for id, resp := range inflightMessages {421		if _, ok := earlyReplyAcks[id]; !ok {422			putMsgAck := &cherami.PutMessageAck{423				ID:          common.StringPtr(id),424				UserContext: resp.userContext,425				Status:      common.CheramiStatusPtr(cherami.Status_FAILED),426				Message:     common.StringPtr("inputhost: timing out unacked message"),427			}428			d := time.Since(resp.putMsgRecvTime)429			conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageBeforeAckLatency, d)430			conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, d)431			conn.stream.Write(createAckCmd(putMsgAck))432			d = time.Since(resp.putMsgRecvTime)433			conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, d)434			conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, d)435			if d > logHighLatencyThreshold {436				if logThrottler.Allow() {437					conn.logger.WithFields(bark.Fields{438						common.TagDstPth:     common.FmtDstPth(conn.destinationPath),439						common.TagInPutAckID: common.FmtInPutAckID(id),440						`duration`:           d,441						`putMsgChanLen`:      len(conn.putMsgCh),442						`putMsgAckChanLen`:   len(conn.ackChannel),443						`replyChanLen`:       len(conn.replyCh),444						`throttled`:          atomic.SwapInt32(&logsThrottled, 0),445					}).Error(`failInflightMessages: publish message latency`)446				} else {447					atomic.AddInt32(&logsThrottled, 1)448				}449			}450			// Record the number of failed messages451			conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageFailures)452			conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageFailures)453			conn.failedMsgs++454			conn.pathCache.dstMetrics.Increment(load.DstMetricNumFailed)455		}456	}457	// flush whatever we have458	conn.stream.Flush()459	conn.waitWG.Done()460}461func (conn *pubConnection) flushCmdToClient(unflushedWrites int) (err error) {462	if err = conn.stream.Flush(); err != nil {463		conn.logger.WithFields(bark.Fields{common.TagErr: err, `unflushedWrites`: unflushedWrites}).Error(`inputhost: error flushing messages to client stream failed`)464		// since flush failed, trigger a close of the connection which will fail inflight messages465	}466	return467}468func (conn *pubConnection) writeCmdToClient(cmd *cherami.InputHostCommand, drainWG *sync.WaitGroup) (err error) {469	if err = conn.stream.Write(cmd); err != nil {470		conn.logger.WithFields(bark.Fields{`cmd`: cmd, common.TagErr: err}).Info(`inputhost: Unable to Write cmd back to client`)471	}472	if cmd.GetType() == cherami.InputHostCommandType_DRAIN {473		// if this is a DRAIN command wait for some timeout period and then474		// just close the connection475		// We do this to make sure the connection object doesn't hang around476		// at this point we have sent the DRAIN command to the client477		// XXX:  assert non-nil WG478		go conn.waitForDrain(drainWG)479	}480	return481}482// waitForDrain is a safety routine to make sure we don't hold onto the connection object483// In a normal scenario, when all the extents hosted on this inputhost for the destination is484// drained properly, it will automatically trigger a connection close, which will clean up the485// connection object.486// But if another extent gets assigned to this inputhost before the drain of an older extent487// completes, then the pathCache will not unload which means this object won't be closed, while the488// client would have already stopped writing on this connection. In that case, we will hit the489// timeout and close the object anyway.490func (conn *pubConnection) waitForDrain(drainWG *sync.WaitGroup) {491	// wait for some time to set the WG to be done.492	connWGTimer := common.NewTimer(connWGTimeout)493	defer connWGTimer.Stop()494	select {495	case <-conn.closeChannel:496		drainWG.Done()497		return498	case <-connWGTimer.C:499		drainWG.Done()500	}501	// now the WG is *unblocked*; wait for the actual drain502	// from the exthost layer to finish for a timeout period503	drainTimer := common.NewTimer(defaultDrainTimeout)504	defer drainTimer.Stop()505	// just wait for a minute, for all the inflight messages to drain506	select {507	case <-conn.closeChannel:508		// if it is closed already, just return509		return510	case <-drainTimer.C:511		conn.logger.Warn("timed out waiting for drain to finish. just closing the connection")512		go conn.close()513	}514}515func (conn *pubConnection) writeAckToClient(inflightMessages map[string]response, ack *cherami.PutMessageAck, ackReceiveTime time.Time) (exists bool, err error) {516	cmd := createAckCmd(ack)517	err = conn.writeCmdToClient(cmd, nil)518	if err != nil {519		conn.logger.520			WithField(common.TagInPutAckID, common.FmtInPutAckID(ack.GetID())).521			WithField(common.TagErr, err).Error(`inputhost: writing ack Id failed`)522	}523	exists = conn.updateInflightMap(inflightMessages, ack.GetID(), ackReceiveTime)524	// update the failure metric, if needed525	if ack.GetStatus() == cherami.Status_FAILED || err != nil {526		conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageFailures)527		conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageFailures)528	}529	if err == nil {530		switch ack.GetStatus() {531		case cherami.Status_OK:532			conn.sentAcks++533			conn.pathCache.dstMetrics.Increment(load.DstMetricNumAcks)534		case cherami.Status_FAILED:535			conn.sentNacks++536			conn.pathCache.dstMetrics.Increment(load.DstMetricNumNacks)537		case cherami.Status_THROTTLED:538			conn.sentThrottled++539			conn.pathCache.dstMetrics.Increment(load.DstMetricNumThrottled)540		}541	}542	return543}544func (conn *pubConnection) updateInflightMap(inflightMessages map[string]response, ackID string, ackReceiveTime time.Time) bool {545	if resp, ok := inflightMessages[ackID]; ok {546		// record the latency547		d := time.Since(resp.putMsgRecvTime)548		conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, d)549		conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, d)550		conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime))551		conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime))552		if d > logHighLatencyThreshold {553			if logThrottler.Allow() {554				conn.logger.WithFields(bark.Fields{555					common.TagDstPth:     common.FmtDstPth(conn.destinationPath),556					common.TagInPutAckID: common.FmtInPutAckID(ackID),557					`d`:                  d,558					`putMsgChanLen`:      len(conn.putMsgCh),559					`putMsgAckChanLen`:   len(conn.ackChannel),560					`replyChanLen`:       len(conn.replyCh),561					`throttled`:          atomic.SwapInt32(&logsThrottled, 0),562				}).Info(`publish message latency at updateInflightMap`)563			} else {564				atomic.AddInt32(&logsThrottled, 1)565			}566		}567		delete(inflightMessages, ackID)568		return ok569	}570	// didn't find it in the inflight map, which means we got an ack even before we populated it571	return false572}573func (conn *pubConnection) updateEarlyReplyAcks(resCh response, earlyReplyAcks map[string]earlyReplyAck) {574	// make sure we account for the time when we sent the ack as well575	d := time.Since(resCh.putMsgRecvTime)576	ack, _ := earlyReplyAcks[resCh.ackID]577	actualDuration := d - time.Since(ack.ackSentTime)578	if d > logHighLatencyThreshold {579		if logThrottler.Allow() {580			conn.logger.WithFields(bark.Fields{581				common.TagDstPth:     common.FmtDstPth(conn.destinationPath),582				common.TagInPutAckID: common.FmtInPutAckID(resCh.ackID),583				`d`:                  d,584				`actualDuration`:     actualDuration,585				`putMsgChanLen`:      len(conn.putMsgCh),586				`putMsgAckChanLen`:   len(conn.ackChannel),587				`replyChanLen`:       len(conn.replyCh),588				`throttled`:          atomic.SwapInt32(&logsThrottled, 0),589			}).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`)590		} else {591			atomic.AddInt32(&logsThrottled, 1)592		}593	}594	conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, actualDuration)595	conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, actualDuration)596	// also record the time that excludes the time for sending ack back to socket597	actualDurationExcludeSocket := d - time.Since(ack.ackReceiveTime)598	conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageBeforeAckLatency, actualDurationExcludeSocket)599	conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, actualDurationExcludeSocket)600	delete(earlyReplyAcks, resCh.ackID)601	// XXX: Disabled due to log noise602	// conn.logger.WithField(common.TagInPutAckID, common.FmtInPutAckID(resCh.ackID)).Info("Found ack for this response in earlyReplyAcks map. Not adding it to the inflight map")603}604func createReconfigureCmd(rInfo *reconfigInfo) *cherami.InputHostCommand {605	cmd := cherami.NewInputHostCommand()606	cmd.Reconfigure = &cherami.ReconfigureInfo{UpdateUUID: common.StringPtr(rInfo.updateUUID)}607	cmd.Type = common.CheramiInputHostCommandTypePtr(rInfo.cmdType)608	return cmd609}610func createAckCmd(ack *cherami.PutMessageAck) *cherami.InputHostCommand {611	cmd := cherami.NewInputHostCommand()612	cmd.Ack = ack613	cmd.Type = common.CheramiInputHostCommandTypePtr(cherami.InputHostCommandType_ACK)614	return cmd615}616// GetMsgsLimitPerSecond gets msgs rate limit per second for this connection617func (conn *pubConnection) GetMsgsLimitPerSecond() int {618	return int(atomic.LoadInt32(&conn.connMsgsLimitPerSecond))619}620// SetMsgsLimitPerSecond sets msgs rate limit per second for this connection621func (conn *pubConnection) SetMsgsLimitPerSecond(connLimit int32) {622	atomic.StoreInt32(&conn.connMsgsLimitPerSecond, connLimit)623	conn.SetConnTokenBucketValue(int32(connLimit))624}625// GetConnTokenBucketValue gets token bucket for connMsgsLimitPerSecond626func (conn *pubConnection) GetConnTokenBucketValue() common.TokenBucket {627	return conn.connTokenBucketValue.Load().(common.TokenBucket)...

Full Screen

Full Screen

slice_test.go

Source:slice_test.go Github

copy

Full Screen

1package pool2import (3	"context"4	"io"5	"testing"6	"time"7	xtime "github.com/zhangjinglei/wahaha/pkg/time"8	"github.com/stretchr/testify/assert"9)10type closer struct {11}12func (c *closer) Close() error {13	return nil14}15type connection struct {16	c    io.Closer17	pool Pool18}19func (c *connection) HandleQuick() {20	//	time.Sleep(1 * time.Millisecond)21}22func (c *connection) HandleNormal() {23	time.Sleep(20 * time.Millisecond)24}25func (c *connection) HandleSlow() {26	time.Sleep(500 * time.Millisecond)27}28func (c *connection) Close() {29	c.pool.Put(context.Background(), c.c, false)30}31func TestSliceGetPut(t *testing.T) {32	// new pool33	config := &Config{34		Active:      1,35		Idle:        1,36		IdleTimeout: xtime.Duration(90 * time.Second),37		WaitTimeout: xtime.Duration(10 * time.Millisecond),38		Wait:        false,39	}40	pool := NewSlice(config)41	pool.New = func(ctx context.Context) (io.Closer, error) {42		return &closer{}, nil43	}44	// test Get Put45	conn, err := pool.Get(context.TODO())46	assert.Nil(t, err)47	c1 := connection{pool: pool, c: conn}48	c1.HandleNormal()49	c1.Close()50}51func TestSlicePut(t *testing.T) {52	var id = 053	type connID struct {54		io.Closer55		id int56	}57	config := &Config{58		Active:      1,59		Idle:        1,60		IdleTimeout: xtime.Duration(1 * time.Second),61		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),62		Wait: false,63	}64	pool := NewSlice(config)65	pool.New = func(ctx context.Context) (io.Closer, error) {66		id = id + 167		return &connID{id: id, Closer: &closer{}}, nil68	}69	// test Put(ctx, conn, true)70	conn, err := pool.Get(context.TODO())71	assert.Nil(t, err)72	conn1 := conn.(*connID)73	// Put(ctx, conn, true) drop the connection.74	pool.Put(context.TODO(), conn, true)75	conn, err = pool.Get(context.TODO())76	assert.Nil(t, err)77	conn2 := conn.(*connID)78	assert.NotEqual(t, conn1.id, conn2.id)79}80func TestSliceIdleTimeout(t *testing.T) {81	var id = 082	type connID struct {83		io.Closer84		id int85	}86	config := &Config{87		Active: 1,88		Idle:   1,89		// conn timeout90		IdleTimeout: xtime.Duration(1 * time.Millisecond),91	}92	pool := NewSlice(config)93	pool.New = func(ctx context.Context) (io.Closer, error) {94		id = id + 195		return &connID{id: id, Closer: &closer{}}, nil96	}97	// test Put(ctx, conn, true)98	conn, err := pool.Get(context.TODO())99	assert.Nil(t, err)100	conn1 := conn.(*connID)101	// Put(ctx, conn, true) drop the connection.102	pool.Put(context.TODO(), conn, false)103	time.Sleep(5 * time.Millisecond)104	// idletimeout and get new conn105	conn, err = pool.Get(context.TODO())106	assert.Nil(t, err)107	conn2 := conn.(*connID)108	assert.NotEqual(t, conn1.id, conn2.id)109}110func TestSliceContextTimeout(t *testing.T) {111	// new pool112	config := &Config{113		Active:      1,114		Idle:        1,115		IdleTimeout: xtime.Duration(90 * time.Second),116		WaitTimeout: xtime.Duration(10 * time.Millisecond),117		Wait:        false,118	}119	pool := NewSlice(config)120	pool.New = func(ctx context.Context) (io.Closer, error) {121		return &closer{}, nil122	}123	// test context timeout124	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)125	defer cancel()126	conn, err := pool.Get(ctx)127	assert.Nil(t, err)128	_, err = pool.Get(ctx)129	// context timeout error130	assert.NotNil(t, err)131	pool.Put(context.TODO(), conn, false)132	_, err = pool.Get(ctx)133	assert.Nil(t, err)134}135func TestSlicePoolExhausted(t *testing.T) {136	// test pool exhausted137	config := &Config{138		Active:      1,139		Idle:        1,140		IdleTimeout: xtime.Duration(90 * time.Second),141		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),142		Wait: false,143	}144	pool := NewSlice(config)145	pool.New = func(ctx context.Context) (io.Closer, error) {146		return &closer{}, nil147	}148	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)149	defer cancel()150	conn, err := pool.Get(context.TODO())151	assert.Nil(t, err)152	_, err = pool.Get(ctx)153	// config active == 1, so no avaliable conns make connection exhausted.154	assert.NotNil(t, err)155	pool.Put(context.TODO(), conn, false)156	_, err = pool.Get(ctx)157	assert.Nil(t, err)158}159func TestSliceStaleClean(t *testing.T) {160	var id = 0161	type connID struct {162		io.Closer163		id int164	}165	config := &Config{166		Active:      1,167		Idle:        1,168		IdleTimeout: xtime.Duration(1 * time.Second),169		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),170		Wait: false,171	}172	pool := NewList(config)173	pool.New = func(ctx context.Context) (io.Closer, error) {174		id = id + 1175		return &connID{id: id, Closer: &closer{}}, nil176	}177	conn, err := pool.Get(context.TODO())178	assert.Nil(t, err)179	conn1 := conn.(*connID)180	pool.Put(context.TODO(), conn, false)181	conn, err = pool.Get(context.TODO())182	assert.Nil(t, err)183	conn2 := conn.(*connID)184	assert.Equal(t, conn1.id, conn2.id)185	pool.Put(context.TODO(), conn, false)186	// sleep more than idleTimeout187	time.Sleep(2 * time.Second)188	conn, err = pool.Get(context.TODO())189	assert.Nil(t, err)190	conn3 := conn.(*connID)191	assert.NotEqual(t, conn1.id, conn3.id)192}193func BenchmarkSlice1(b *testing.B) {194	config := &Config{195		Active:      30,196		Idle:        30,197		IdleTimeout: xtime.Duration(90 * time.Second),198		WaitTimeout: xtime.Duration(10 * time.Millisecond),199		Wait:        false,200	}201	pool := NewSlice(config)202	pool.New = func(ctx context.Context) (io.Closer, error) {203		return &closer{}, nil204	}205	b.ResetTimer()206	b.RunParallel(func(pb *testing.PB) {207		for pb.Next() {208			conn, err := pool.Get(context.TODO())209			if err != nil {210				b.Error(err)211				continue212			}213			c1 := connection{pool: pool, c: conn}214			c1.HandleQuick()215			c1.Close()216		}217	})218}219func BenchmarkSlice2(b *testing.B) {220	config := &Config{221		Active:      30,222		Idle:        30,223		IdleTimeout: xtime.Duration(90 * time.Second),224		WaitTimeout: xtime.Duration(10 * time.Millisecond),225		Wait:        false,226	}227	pool := NewSlice(config)228	pool.New = func(ctx context.Context) (io.Closer, error) {229		return &closer{}, nil230	}231	b.ResetTimer()232	b.RunParallel(func(pb *testing.PB) {233		for pb.Next() {234			conn, err := pool.Get(context.TODO())235			if err != nil {236				b.Error(err)237				continue238			}239			c1 := connection{pool: pool, c: conn}240			c1.HandleNormal()241			c1.Close()242		}243	})244}245func BenchmarkSlice3(b *testing.B) {246	config := &Config{247		Active:      30,248		Idle:        30,249		IdleTimeout: xtime.Duration(90 * time.Second),250		WaitTimeout: xtime.Duration(10 * time.Millisecond),251		Wait:        false,252	}253	pool := NewSlice(config)254	pool.New = func(ctx context.Context) (io.Closer, error) {255		return &closer{}, nil256	}257	b.ResetTimer()258	b.RunParallel(func(pb *testing.PB) {259		for pb.Next() {260			conn, err := pool.Get(context.TODO())261			if err != nil {262				b.Error(err)263				continue264			}265			c1 := connection{pool: pool, c: conn}266			c1.HandleSlow()267			c1.Close()268		}269	})270}271func BenchmarkSlice4(b *testing.B) {272	config := &Config{273		Active:      30,274		Idle:        30,275		IdleTimeout: xtime.Duration(90 * time.Second),276		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),277		Wait: false,278	}279	pool := NewSlice(config)280	pool.New = func(ctx context.Context) (io.Closer, error) {281		return &closer{}, nil282	}283	b.ResetTimer()284	b.RunParallel(func(pb *testing.PB) {285		for pb.Next() {286			conn, err := pool.Get(context.TODO())287			if err != nil {288				b.Error(err)289				continue290			}291			c1 := connection{pool: pool, c: conn}292			c1.HandleSlow()293			c1.Close()294		}295	})296}297func BenchmarkSlice5(b *testing.B) {298	config := &Config{299		Active:      30,300		Idle:        30,301		IdleTimeout: xtime.Duration(90 * time.Second),302		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),303		Wait: true,304	}305	pool := NewSlice(config)306	pool.New = func(ctx context.Context) (io.Closer, error) {307		return &closer{}, nil308	}309	b.ResetTimer()310	b.RunParallel(func(pb *testing.PB) {311		for pb.Next() {312			conn, err := pool.Get(context.TODO())313			if err != nil {314				b.Error(err)315				continue316			}317			c1 := connection{pool: pool, c: conn}318			c1.HandleSlow()319			c1.Close()320		}321	})322}...

Full Screen

Full Screen

list_test.go

Source:list_test.go Github

copy

Full Screen

1package pool2import (3	"context"4	"io"5	"testing"6	"time"7	xtime "github.com/zhangjinglei/wahaha/pkg/time"8	"github.com/stretchr/testify/assert"9)10func TestListGetPut(t *testing.T) {11	// new pool12	config := &Config{13		Active:      1,14		Idle:        1,15		IdleTimeout: xtime.Duration(90 * time.Second),16		WaitTimeout: xtime.Duration(10 * time.Millisecond),17		Wait:        false,18	}19	pool := NewList(config)20	pool.New = func(ctx context.Context) (io.Closer, error) {21		return &closer{}, nil22	}23	// test Get Put24	conn, err := pool.Get(context.TODO())25	assert.Nil(t, err)26	c1 := connection{pool: pool, c: conn}27	c1.HandleNormal()28	c1.Close()29}30func TestListPut(t *testing.T) {31	var id = 032	type connID struct {33		io.Closer34		id int35	}36	config := &Config{37		Active:      1,38		Idle:        1,39		IdleTimeout: xtime.Duration(1 * time.Second),40		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),41		Wait: false,42	}43	pool := NewList(config)44	pool.New = func(ctx context.Context) (io.Closer, error) {45		id = id + 146		return &connID{id: id, Closer: &closer{}}, nil47	}48	// test Put(ctx, conn, true)49	conn, err := pool.Get(context.TODO())50	assert.Nil(t, err)51	conn1 := conn.(*connID)52	// Put(ctx, conn, true) drop the connection.53	pool.Put(context.TODO(), conn, true)54	conn, err = pool.Get(context.TODO())55	assert.Nil(t, err)56	conn2 := conn.(*connID)57	assert.NotEqual(t, conn1.id, conn2.id)58}59func TestListIdleTimeout(t *testing.T) {60	var id = 061	type connID struct {62		io.Closer63		id int64	}65	config := &Config{66		Active: 1,67		Idle:   1,68		// conn timeout69		IdleTimeout: xtime.Duration(1 * time.Millisecond),70	}71	pool := NewList(config)72	pool.New = func(ctx context.Context) (io.Closer, error) {73		id = id + 174		return &connID{id: id, Closer: &closer{}}, nil75	}76	// test Put(ctx, conn, true)77	conn, err := pool.Get(context.TODO())78	assert.Nil(t, err)79	conn1 := conn.(*connID)80	// Put(ctx, conn, true) drop the connection.81	pool.Put(context.TODO(), conn, false)82	time.Sleep(5 * time.Millisecond)83	// idletimeout and get new conn84	conn, err = pool.Get(context.TODO())85	assert.Nil(t, err)86	conn2 := conn.(*connID)87	assert.NotEqual(t, conn1.id, conn2.id)88}89func TestListContextTimeout(t *testing.T) {90	// new pool91	config := &Config{92		Active:      1,93		Idle:        1,94		IdleTimeout: xtime.Duration(90 * time.Second),95		WaitTimeout: xtime.Duration(10 * time.Millisecond),96		Wait:        false,97	}98	pool := NewList(config)99	pool.New = func(ctx context.Context) (io.Closer, error) {100		return &closer{}, nil101	}102	// test context timeout103	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)104	defer cancel()105	conn, err := pool.Get(ctx)106	assert.Nil(t, err)107	_, err = pool.Get(ctx)108	// context timeout error109	assert.NotNil(t, err)110	pool.Put(context.TODO(), conn, false)111	_, err = pool.Get(ctx)112	assert.Nil(t, err)113}114func TestListPoolExhausted(t *testing.T) {115	// test pool exhausted116	config := &Config{117		Active:      1,118		Idle:        1,119		IdleTimeout: xtime.Duration(90 * time.Second),120		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),121		Wait: false,122	}123	pool := NewList(config)124	pool.New = func(ctx context.Context) (io.Closer, error) {125		return &closer{}, nil126	}127	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)128	defer cancel()129	conn, err := pool.Get(context.TODO())130	assert.Nil(t, err)131	_, err = pool.Get(ctx)132	// config active == 1, so no avaliable conns make connection exhausted.133	assert.NotNil(t, err)134	pool.Put(context.TODO(), conn, false)135	_, err = pool.Get(ctx)136	assert.Nil(t, err)137}138func TestListStaleClean(t *testing.T) {139	var id = 0140	type connID struct {141		io.Closer142		id int143	}144	config := &Config{145		Active:      1,146		Idle:        1,147		IdleTimeout: xtime.Duration(1 * time.Second),148		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),149		Wait: false,150	}151	pool := NewList(config)152	pool.New = func(ctx context.Context) (io.Closer, error) {153		id = id + 1154		return &connID{id: id, Closer: &closer{}}, nil155	}156	conn, err := pool.Get(context.TODO())157	assert.Nil(t, err)158	conn1 := conn.(*connID)159	pool.Put(context.TODO(), conn, false)160	conn, err = pool.Get(context.TODO())161	assert.Nil(t, err)162	conn2 := conn.(*connID)163	assert.Equal(t, conn1.id, conn2.id)164	pool.Put(context.TODO(), conn, false)165	// sleep more than idleTimeout166	time.Sleep(2 * time.Second)167	conn, err = pool.Get(context.TODO())168	assert.Nil(t, err)169	conn3 := conn.(*connID)170	assert.NotEqual(t, conn1.id, conn3.id)171}172func BenchmarkList1(b *testing.B) {173	config := &Config{174		Active:      30,175		Idle:        30,176		IdleTimeout: xtime.Duration(90 * time.Second),177		WaitTimeout: xtime.Duration(10 * time.Millisecond),178		Wait:        false,179	}180	pool := NewList(config)181	pool.New = func(ctx context.Context) (io.Closer, error) {182		return &closer{}, nil183	}184	b.ResetTimer()185	b.RunParallel(func(pb *testing.PB) {186		for pb.Next() {187			conn, err := pool.Get(context.TODO())188			if err != nil {189				b.Error(err)190				continue191			}192			c1 := connection{pool: pool, c: conn}193			c1.HandleQuick()194			c1.Close()195		}196	})197}198func BenchmarkList2(b *testing.B) {199	config := &Config{200		Active:      30,201		Idle:        30,202		IdleTimeout: xtime.Duration(90 * time.Second),203		WaitTimeout: xtime.Duration(10 * time.Millisecond),204		Wait:        false,205	}206	pool := NewList(config)207	pool.New = func(ctx context.Context) (io.Closer, error) {208		return &closer{}, nil209	}210	b.ResetTimer()211	b.RunParallel(func(pb *testing.PB) {212		for pb.Next() {213			conn, err := pool.Get(context.TODO())214			if err != nil {215				b.Error(err)216				continue217			}218			c1 := connection{pool: pool, c: conn}219			c1.HandleNormal()220			c1.Close()221		}222	})223}224func BenchmarkPool3(b *testing.B) {225	config := &Config{226		Active:      30,227		Idle:        30,228		IdleTimeout: xtime.Duration(90 * time.Second),229		WaitTimeout: xtime.Duration(10 * time.Millisecond),230		Wait:        false,231	}232	pool := NewList(config)233	pool.New = func(ctx context.Context) (io.Closer, error) {234		return &closer{}, nil235	}236	b.ResetTimer()237	b.RunParallel(func(pb *testing.PB) {238		for pb.Next() {239			conn, err := pool.Get(context.TODO())240			if err != nil {241				b.Error(err)242				continue243			}244			c1 := connection{pool: pool, c: conn}245			c1.HandleSlow()246			c1.Close()247		}248	})249}250func BenchmarkList4(b *testing.B) {251	config := &Config{252		Active:      30,253		Idle:        30,254		IdleTimeout: xtime.Duration(90 * time.Second),255		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),256		Wait: false,257	}258	pool := NewList(config)259	pool.New = func(ctx context.Context) (io.Closer, error) {260		return &closer{}, nil261	}262	b.ResetTimer()263	b.RunParallel(func(pb *testing.PB) {264		for pb.Next() {265			conn, err := pool.Get(context.TODO())266			if err != nil {267				b.Error(err)268				continue269			}270			c1 := connection{pool: pool, c: conn}271			c1.HandleSlow()272			c1.Close()273		}274	})275}276func BenchmarkList5(b *testing.B) {277	config := &Config{278		Active:      30,279		Idle:        30,280		IdleTimeout: xtime.Duration(90 * time.Second),281		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),282		Wait: true,283	}284	pool := NewList(config)285	pool.New = func(ctx context.Context) (io.Closer, error) {286		return &closer{}, nil287	}288	b.ResetTimer()289	b.RunParallel(func(pb *testing.PB) {290		for pb.Next() {291			conn, err := pool.Get(context.TODO())292			if err != nil {293				b.Error(err)294				continue295			}296			c1 := connection{pool: pool, c: conn}297			c1.HandleSlow()298			c1.Close()299		}300	})301}...

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3    m := messenger.New(messenger.Options{})4    m.Start()5    defer m.Stop()6    msg := proton.NewMessage()7    msg.SetSubject("Hello")8    msg.SetBody("World")9    m.Put(msg)10    m.Send()11}12import (13func main() {14    m := messenger.New(messenger.Options{})15    m.Start()16    defer m.Stop()17    m.Receive()18    msg := m.Get()19    fmt.Println(msg.Body())20}21import (22func main() {23    m := messenger.New(messenger.Options{})24    m.Start()25    defer m.Stop()26    msg := proton.NewMessage()27    msg.SetSubject("Hello")28    msg.SetBody("World")29    m.Put(msg)30    m.Send()31}32import (33func main() {34    m := messenger.New(messenger.Options{})35    m.Start()36    defer m.Stop()37    m.Receive()

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3  kt := keytab.New()4  err := kt.Unmarshal(testdata.TEST2_KEYTAB)5  if err != nil {6    log.Fatal(err)7  }8  clconf, err := config.NewFromString(testdata.TEST2_KRB5CONF)9  if err != nil {10    log.Fatal(err)11  }12  clcreds, err := credentials.LoadCCache(testdata.TEST2_CCACHE)13  if err != nil {14    log.Fatal(err)15  }16  cl := client.NewClientWithKeytab("testuser2", "TEST.GOKRB5", kt, clconf)17  cl.SetCredentials(clcreds)18  err = cl.GetServiceTicket("HTTP/test.gokrb5")19  if err != nil {20    log.Fatal(err)21  }22  svc := service.NewService("HTTP/test.gokrb5", testdata2.TEST2_KEYTAB, testdata2.TEST2_KRB5CONF)23  _, err = svc.Accept(clcreds)24  if err != nil {25    log.Fatal(err)26  }27  conn, err := svc.NewConnection(clcreds)28  if err != nil {29    log.Fatal(err)30  }31  err = conn.Put([]byte("Hello world"))32  if err != nil {33    log.Fatal(err)34  }35  b, err := conn.Get()36  if err != nil {

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3    client := redis.NewClient(&redis.Options{4    })5    pong, err := client.Ping().Result()6    fmt.Println(pong, err)7}8import (9func main() {10    client := redis.NewClient(&redis.Options{11    })12    pong, err := client.Ping().Result()13    fmt.Println(pong, err)14}15import (16func main() {17    client := redis.NewClient(&redis.Options{18    })19    pong, err := client.Ping().Result()20    fmt.Println(pong, err)21}22import (23func main() {24    client := redis.NewClient(&redis.Options{25    })26    pong, err := client.Ping().Result()27    fmt.Println(pong, err)28}29import (30func main() {31    client := redis.NewClient(&redis.Options{32    })33    pong, err := client.Ping().Result()34    fmt.Println(pong, err)35}36import (

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3    if err != nil {4        fmt.Println(err)5    }6    client := &http.Client{}7    resp, err := client.Do(req)8    if err != nil {9        fmt.Println(err)10    }11    fmt.Println(resp)12}13Content-Type: text/plain; charset=utf-8

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2type Args struct {3}4type Quotient struct {5}6func (t *Arith) Multiply(args *Args, reply *int) error {7}8func (t *Arith) Divide(args *Args, quo *Quotient) error {9   if args.B == 0 {10      return fmt.Errorf("divide by zero")11   }12}13func main() {14   arith := new(Arith)15   rpc.Register(arith)16   rpc.HandleHTTP()17   l, e := net.Listen("tcp", ":1234")18   if e != nil {19      log.Fatal("listen error:", e)20   }21   go http.Serve(l, nil)22   client, err := jsonrpc.Dial("tcp", "localhost:1234")23   if err != nil {24      log.Fatal("dialing:", err)25   }26   args := &Args{7, 8}27   err = client.Call("Arith.Multiply", args, &reply)28   if err != nil {29      log.Fatal("arith error:", err)30   }31   fmt.Printf("Arith: %d*%d=%d32   quot := new(Quotient)33   err = client.Call("Arith.Divide", args, &quot)34   if err != nil {35      log.Fatal("arith error:", err)36   }37   fmt.Printf("Arith: %d/%d=%d remainder %d38}

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3	fmt.Println("Starting the application...")4	failOnError(err, "Failed to connect to RabbitMQ")5	defer conn.Close()6	ch, err := conn.Channel()7	failOnError(err, "Failed to open a channel")8	defer ch.Close()9	q, err := ch.QueueDeclare(10	failOnError(err, "Failed to declare a queue")11	body := bodyFrom(os.Args)12	err = ch.Publish(13		amqp.Publishing{14			Body:        []byte(body),15		})16	failOnError(err, "Failed to publish a message")17	fmt.Println(" [x] Sent %s", body)18}19func failOnError(err error, msg string) {20	if err != nil {21		fmt.Println("%s: %s", msg, err)22	}23}24func bodyFrom(args []string) string {25	if (len(args) < 2) || os.Args[1] == "" {26	} else {27		s = strings.Join(args[1:], " ")28	}29}30import (31func main() {32	fmt.Println("Starting the application...")33	failOnError(err, "Failed to connect to RabbitMQ")34	defer conn.Close()35	ch, err := conn.Channel()36	failOnError(err, "Failed to open a channel")37	defer ch.Close()38	q, err := ch.QueueDeclare(

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful