How to use put method of conn Package

Best Gauge code snippet using conn.put

pubconnection.go

Source:pubconnection.go Github

copy

Full Screen

...16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19// THE SOFTWARE.20package inputhost21import (22 "sync"23 "sync/atomic"24 "time"25 "github.com/uber-common/bark"26 "github.com/uber/cherami-server/common"27 "github.com/uber/cherami-server/common/metrics"28 "github.com/uber/cherami-server/common/throttler"29 "github.com/uber/cherami-server/services/inputhost/load"30 serverStream "github.com/uber/cherami-server/stream"31 "github.com/uber/cherami-thrift/.generated/go/cherami"32)33// logHighLatencyThreshold is the time threshold for logging34const logHighLatencyThreshold = time.Second35var logThrottler = throttler.New(1, 5*time.Second) // log only once every 5 seconds at max36var logsThrottled int3237type (38 pubConnection struct {39 connID connectionID40 destinationPath string41 stream serverStream.BInOpenPublisherStreamInCall42 logger bark.Logger43 reconfigureClientCh chan *reconfigInfo44 putMsgCh chan *inPutMessage45 cacheTimeout time.Duration46 ackChannel chan *cherami.PutMessageAck47 replyCh chan response48 closeChannel chan struct{} // this is the channel which is used to actually close the stream49 waitWG sync.WaitGroup50 notifyCloseCh chan connectionID // this is used to notify the path cache to remove us from its list51 doneCh chan bool // this is used to unblock the OpenPublisherStream()52 recvMsgs int64 // total msgs received53 sentAcks int64 // total acks sent out54 sentNacks int64 // total n-acks sent out55 sentThrottled int64 // total msgs that were throttled56 failedMsgs int64 // total inflight msgs that were 'failed'57 connTokenBucketValue atomic.Value // Value to controll access for TB for rate limit Num of Msgs received per sec58 connMsgsLimitPerSecond int32 //per second rate limit for this connection59 lk sync.Mutex60 opened bool61 closed bool62 limitsEnabled bool63 pathCache *inPathCache64 pathWG *sync.WaitGroup65 }66 response struct {67 ackID string // this is unique identifier of message68 userContext map[string]string // this is user specified context to pass through69 putMsgRecvTime time.Time // this is the msg receive time, used for latency metrics70 }71 // inPutMessage is the wrapper struct which holds the actual message and72 // the channel to get the reply73 // XXX: Note any changes to this struct should be made on the74 // PutMessageBatch() API in services/inputhost/inpthost.go75 inPutMessage struct {76 putMsg *cherami.PutMessage77 putMsgAckCh chan *cherami.PutMessageAck78 putMsgRecvTime time.Time79 }80 earlyReplyAck struct {81 // time when we receive ack from replica82 ackReceiveTime time.Time83 // time when we send ack back to stream84 ackSentTime time.Time85 }86 reconfigInfo struct {87 updateUUID string88 cmdType cherami.InputHostCommandType89 drainWG *sync.WaitGroup90 }91 pubConnectionClosedCb func(connectionID)92)93// failTimeout is the timeout to wait for acks from the store when a94// stream is closed95// if we don't get any acks back fail the messages96const failTimeout = 3 * time.Second97// reconfigClientChSize is the size of the reconfigClientCh98const reconfigClientChSize = 5099// perConnMsgsLimitPerSecond is the rate limit per connection100const perConnMsgsLimitPerSecond = 10000101func newPubConnection(destinationPath string, stream serverStream.BInOpenPublisherStreamInCall, pathCache *inPathCache, m3Client metrics.Client, limitsEnabled bool, timeout time.Duration, doneCh chan bool) *pubConnection {102 conn := &pubConnection{103 connID: pathCache.currID,104 destinationPath: destinationPath,105 logger: pathCache.logger.WithFields(bark.Fields{106 common.TagInPubConnID: common.FmtInPubConnID(int(pathCache.currID)),107 common.TagModule: `pubConn`,108 }),109 stream: stream,110 putMsgCh: pathCache.putMsgCh,111 cacheTimeout: timeout,112 //perConnTokenBucket: common.NewTokenBucket(perConnMsgsLimitPerSecond, common.NewRealTimeSource()),113 replyCh: make(chan response, defaultBufferSize),114 reconfigureClientCh: make(chan *reconfigInfo, reconfigClientChSize),115 ackChannel: make(chan *cherami.PutMessageAck, defaultBufferSize),116 closeChannel: make(chan struct{}),117 notifyCloseCh: pathCache.notifyConnsCloseCh,118 doneCh: doneCh,119 limitsEnabled: limitsEnabled,120 pathCache: pathCache,121 pathWG: &pathCache.connsWG,122 }123 conn.SetMsgsLimitPerSecond(common.HostPerConnMsgsLimitPerSecond)124 return conn125}126func (conn *pubConnection) open() {127 conn.lk.Lock()128 defer conn.lk.Unlock()129 if !conn.opened {130 conn.waitWG.Add(2)131 conn.pathWG.Add(1) // this makes the manage routine in the pathCache is alive132 go conn.readRequestStream()133 go conn.writeAcksStream()134 conn.opened = true135 conn.logger.Info("pubConn opened")136 }137}138func (conn *pubConnection) close() {139 conn.lk.Lock()140 if conn.closed {141 conn.lk.Unlock()142 return143 }144 close(conn.closeChannel)145 conn.closed = true146 conn.waitWG.Wait()147 // we have successfully closed the connection148 // make sure we update the ones who are waiting for us149 select {150 case conn.doneCh <- true:151 default:152 }153 conn.lk.Unlock()154 // notify the patch cache to remove this conn155 // from the cache. No need to hold the lock156 // for this.157 conn.notifyCloseCh <- conn.connID158 // set the wait group for the pathCache to be done159 conn.pathWG.Done()160 conn.logger.WithFields(bark.Fields{161 `sentAcks`: conn.sentAcks,162 `sentNacks`: conn.sentNacks,163 `sentThrottled`: conn.sentThrottled,164 `failedMsgs`: conn.failedMsgs,165 }).Info("pubConn closed")166}167// readRequestStream is the pump which reads messages from the client stream.168// this sends the message to the next layer (extHost) and then also prepares169// the writeAcksStream by sending a message to the intermediary "replyCh"170func (conn *pubConnection) readRequestStream() {171 defer conn.waitWG.Done()172 var msgLength int64173 // Setup the connIdleTimer174 connIdleTimer := common.NewTimer(conn.cacheTimeout)175 defer connIdleTimer.Stop()176 for {177 connIdleTimer.Reset(conn.cacheTimeout)178 select {179 case <-connIdleTimer.C:180 conn.logger.WithField(`cacheTimeout`, conn.cacheTimeout).Info(`client connection idle for : ; closing it`)181 go conn.close()182 return183 default:184 msg, err := conn.stream.Read()185 if err != nil {186 conn.logger.WithField(common.TagErr, err).Info(`inputhost: PublisherStream closed on read error`)187 // conn.close() can block waiting for all routines to188 // go away. make sure we don't deadlock189 go conn.close()190 return191 }192 // record the counter metric193 conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageReceived)194 conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageReceived)195 conn.pathCache.dstMetrics.Increment(load.DstMetricOverallNumMsgs)196 // Note: we increment the destination bytes in counter here because we could throttle this message197 // even before it reaches any of the extents (which increments the extent specific bytes in counter)198 msgLength = int64(len(msg.Data))199 conn.pathCache.dstMetrics.Add(load.DstMetricBytesIn, msgLength)200 conn.pathCache.hostMetrics.Add(load.HostMetricBytesIn, msgLength)201 conn.pathCache.destM3Client.AddCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageReceivedBytes, msgLength)202 conn.recvMsgs++203 inMsg := &inPutMessage{204 putMsg: msg,205 putMsgAckCh: conn.ackChannel,206 putMsgRecvTime: time.Now(),207 }208 throttled := false209 if conn.limitsEnabled {210 consumed, _ := conn.GetConnTokenBucketValue().TryConsume(1)211 throttled = !consumed212 if throttled {213 // just send a THROTTLED status back to the client214 conn.logger.Warn("throttling due to rate violation")215 conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageLimitThrottled)216 conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageLimitThrottled)217 inMsg.putMsgAckCh <- &cherami.PutMessageAck{218 ID: common.StringPtr(msg.GetID()),219 UserContext: msg.GetUserContext(),220 Status: common.CheramiStatusPtr(cherami.Status_THROTTLED),221 Message: common.StringPtr("throttling; inputhost is busy"),222 }223 }224 }225 if !throttled {226 if conn.limitsEnabled {227 // if sending to this channel is blocked we need to return a throttle error to the client228 select {229 case conn.putMsgCh <- inMsg:230 // populate the inflight map231 conn.replyCh <- response{msg.GetID(), msg.GetUserContext(), inMsg.putMsgRecvTime}232 default:233 conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageChannelFullThrottled)234 conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageChannelFullThrottled)235 // just send a THROTTLED status back to the client236 conn.logger.Warn("throttling due to putMsgCh being filled")237 inMsg.putMsgAckCh <- &cherami.PutMessageAck{238 ID: common.StringPtr(msg.GetID()),239 UserContext: msg.GetUserContext(),240 Status: common.CheramiStatusPtr(cherami.Status_THROTTLED),241 Message: common.StringPtr("throttling; inputhost is busy"),242 }243 }244 } else {245 select {246 case conn.putMsgCh <- inMsg:247 conn.replyCh <- response{msg.GetID(), msg.GetUserContext(), inMsg.putMsgRecvTime}248 case <-conn.closeChannel:249 // we are shutting down here. just return250 return251 }252 }253 }254 }255 }256}257// writeAcksStream is the pump which sends acks back to the client.258// we read from the intermediary "replyCh", and populate the259// "inflightMessages" map. this is done to serve 2 purposes.260// 1. read from ackChannel only of necessary261// 2. make sure we respond failure back to the client in case something262// happens and we close the streams underneath263func (conn *pubConnection) writeAcksStream() {264 earlyReplyAcks := make(map[string]earlyReplyAck) // map of all the early acks265 inflightMessages := make(map[string]response)266 defer conn.failInflightMessages(inflightMessages, earlyReplyAcks)267 flushTicker := time.NewTicker(common.FlushTimeout) // start ticker to flush tchannel stream268 defer flushTicker.Stop()269 unflushedWrites := 0270 for {271 select {272 case resCh := <-conn.replyCh:273 // First check if we have already seen the ack for this ID274 if _, ok := earlyReplyAcks[resCh.ackID]; ok {275 // We already received the ack for this msgID. Complete the request immediately.276 conn.updateEarlyReplyAcks(resCh, earlyReplyAcks)277 } else {278 inflightMessages[resCh.ackID] = resCh279 }280 case <-flushTicker.C:281 if unflushedWrites > 0 {282 if err := conn.flushCmdToClient(unflushedWrites); err != nil {283 // since flush failed, trigger a close of the connection which will fail inflight messages284 go conn.close()285 }286 unflushedWrites = 0287 }288 default:289 if len(inflightMessages) == 0 {290 select {291 case resCh := <-conn.replyCh:292 // First check if we have already seen the ack for this ID293 if _, ok := earlyReplyAcks[resCh.ackID]; ok {294 // We already received the ack for this msgID. Complete the request immediately.295 conn.updateEarlyReplyAcks(resCh, earlyReplyAcks)296 } else {297 inflightMessages[resCh.ackID] = resCh298 }299 case rInfo := <-conn.reconfigureClientCh:300 // record the counter metric301 conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostReconfClientRequests)302 cmd := createReconfigureCmd(rInfo)303 if err := conn.writeCmdToClient(cmd, rInfo.drainWG); err != nil {304 // trigger a close of the connection305 go conn.close()306 return307 }308 unflushedWrites++309 // we will flush this in our next interval. Since this is just a reconfig310 case <-flushTicker.C:311 if unflushedWrites > 0 {312 if err := conn.flushCmdToClient(unflushedWrites); err != nil {313 // since flush failed, trigger a close of the connection which will fail inflight messages314 go conn.close()315 return316 }317 unflushedWrites = 0318 }319 case <-conn.closeChannel:320 return321 }322 } else {323 select {324 case ack, ok := <-conn.ackChannel:325 if ok {326 ackReceiveTime := time.Now()327 exists, err := conn.writeAckToClient(inflightMessages, ack, ackReceiveTime)328 if err != nil {329 // trigger a close of the connection330 go conn.close()331 return332 }333 if !exists {334 // we received an ack even before we populated the inflight map335 // put it in the earlyReplyAcks and remove it when we get the inflight map336 // XXX: log disabled to reduce spew337 // conn.logger.338 // WithField(common.TagInPutAckID, common.FmtInPutAckID(ack.GetID())).339 // Debug("received an ack even before we populated the inflight map")340 earlyReplyAcks[ack.GetID()] = earlyReplyAck{341 ackReceiveTime: ackReceiveTime,342 ackSentTime: time.Now(),343 }344 }345 unflushedWrites++346 if unflushedWrites > common.FlushThreshold {347 if err = conn.flushCmdToClient(unflushedWrites); err != nil {348 // since flush failed, trigger a close of the connection which will fail inflight messages349 go conn.close()350 return351 }352 unflushedWrites = 0353 }354 } else {355 return356 }357 case rInfo := <-conn.reconfigureClientCh:358 // record the counter metric359 conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostReconfClientRequests)360 cmd := createReconfigureCmd(rInfo)361 if err := conn.writeCmdToClient(cmd, rInfo.drainWG); err != nil {362 // trigger a close of the connection363 go conn.close()364 return365 }366 unflushedWrites++367 // we will flush this in our next interval. Since this is just a reconfig368 case <-flushTicker.C:369 if unflushedWrites > 0 {370 if err := conn.flushCmdToClient(unflushedWrites); err != nil {371 // since flush failed, trigger a close of the connection which will fail inflight messages372 go conn.close()373 return374 }375 unflushedWrites = 0376 }377 case <-conn.closeChannel:378 return379 }380 }381 }382 }383}384func (conn *pubConnection) failInflightMessages(inflightMessages map[string]response, earlyReplyAcks map[string]earlyReplyAck) {385 defer conn.stream.Done()386 failTimer := common.NewTimer(failTimeout)387 defer failTimer.Stop()388 // make sure we wait for all the messages for some timeout period and fail only if necessary389 // we only iterate through the inflightMessages map because the earlyAcksMap is390 // only there for updating metrics properly and since we are failing here, we don't care.391 for quit := false; !quit && len(inflightMessages) > 0; {392 select {393 case ack, ok := <-conn.ackChannel:394 if ok {395 conn.writeAckToClient(inflightMessages, ack, time.Now())396 // ignore error above since we are anyway failing397 // Since we are anyway failing here, we don't care about the398 // early acks map since the only point for that map is to399 // update metric400 }401 case resCh := <-conn.replyCh:402 // First check if we have already seen the ack for this ID403 // We do the check here to make sure we don't incorrectly populate404 // the inflight messages map and timeout those messages down below405 // after out failTimeout elapses.406 // This situation can happen, if we just sent an ack above in the normal407 // path and have not yet populated the infight map and closed the connection408 if _, ok := earlyReplyAcks[resCh.ackID]; ok {409 // We already received the ack for this msgID. Complete the request immediately.410 conn.updateEarlyReplyAcks(resCh, earlyReplyAcks)411 } else {412 inflightMessages[resCh.ackID] = resCh413 }414 case <-failTimer.C:415 conn.logger.WithField(`inflightMessages`, len(inflightMessages)).Info(`inputhost: timing out messages`)416 quit = true417 }418 }419 // send a failure to all the remaining inflight messages420 for id, resp := range inflightMessages {421 if _, ok := earlyReplyAcks[id]; !ok {422 putMsgAck := &cherami.PutMessageAck{423 ID: common.StringPtr(id),424 UserContext: resp.userContext,425 Status: common.CheramiStatusPtr(cherami.Status_FAILED),426 Message: common.StringPtr("inputhost: timing out unacked message"),427 }428 d := time.Since(resp.putMsgRecvTime)429 conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageBeforeAckLatency, d)430 conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, d)431 conn.stream.Write(createAckCmd(putMsgAck))432 d = time.Since(resp.putMsgRecvTime)433 conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, d)434 conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, d)435 if d > logHighLatencyThreshold {436 if logThrottler.Allow() {437 conn.logger.WithFields(bark.Fields{438 common.TagDstPth: common.FmtDstPth(conn.destinationPath),439 common.TagInPutAckID: common.FmtInPutAckID(id),440 `duration`: d,441 `putMsgChanLen`: len(conn.putMsgCh),442 `putMsgAckChanLen`: len(conn.ackChannel),443 `replyChanLen`: len(conn.replyCh),444 `throttled`: atomic.SwapInt32(&logsThrottled, 0),445 }).Error(`failInflightMessages: publish message latency`)446 } else {447 atomic.AddInt32(&logsThrottled, 1)448 }449 }450 // Record the number of failed messages451 conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageFailures)452 conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageFailures)453 conn.failedMsgs++454 conn.pathCache.dstMetrics.Increment(load.DstMetricNumFailed)455 }456 }457 // flush whatever we have458 conn.stream.Flush()459 conn.waitWG.Done()460}461func (conn *pubConnection) flushCmdToClient(unflushedWrites int) (err error) {462 if err = conn.stream.Flush(); err != nil {463 conn.logger.WithFields(bark.Fields{common.TagErr: err, `unflushedWrites`: unflushedWrites}).Error(`inputhost: error flushing messages to client stream failed`)464 // since flush failed, trigger a close of the connection which will fail inflight messages465 }466 return467}468func (conn *pubConnection) writeCmdToClient(cmd *cherami.InputHostCommand, drainWG *sync.WaitGroup) (err error) {469 if err = conn.stream.Write(cmd); err != nil {470 conn.logger.WithFields(bark.Fields{`cmd`: cmd, common.TagErr: err}).Info(`inputhost: Unable to Write cmd back to client`)471 }472 if cmd.GetType() == cherami.InputHostCommandType_DRAIN {473 // if this is a DRAIN command wait for some timeout period and then474 // just close the connection475 // We do this to make sure the connection object doesn't hang around476 // at this point we have sent the DRAIN command to the client477 // XXX: assert non-nil WG478 go conn.waitForDrain(drainWG)479 }480 return481}482// waitForDrain is a safety routine to make sure we don't hold onto the connection object483// In a normal scenario, when all the extents hosted on this inputhost for the destination is484// drained properly, it will automatically trigger a connection close, which will clean up the485// connection object.486// But if another extent gets assigned to this inputhost before the drain of an older extent487// completes, then the pathCache will not unload which means this object won't be closed, while the488// client would have already stopped writing on this connection. In that case, we will hit the489// timeout and close the object anyway.490func (conn *pubConnection) waitForDrain(drainWG *sync.WaitGroup) {491 // wait for some time to set the WG to be done.492 connWGTimer := common.NewTimer(connWGTimeout)493 defer connWGTimer.Stop()494 select {495 case <-conn.closeChannel:496 drainWG.Done()497 return498 case <-connWGTimer.C:499 drainWG.Done()500 }501 // now the WG is *unblocked*; wait for the actual drain502 // from the exthost layer to finish for a timeout period503 drainTimer := common.NewTimer(defaultDrainTimeout)504 defer drainTimer.Stop()505 // just wait for a minute, for all the inflight messages to drain506 select {507 case <-conn.closeChannel:508 // if it is closed already, just return509 return510 case <-drainTimer.C:511 conn.logger.Warn("timed out waiting for drain to finish. just closing the connection")512 go conn.close()513 }514}515func (conn *pubConnection) writeAckToClient(inflightMessages map[string]response, ack *cherami.PutMessageAck, ackReceiveTime time.Time) (exists bool, err error) {516 cmd := createAckCmd(ack)517 err = conn.writeCmdToClient(cmd, nil)518 if err != nil {519 conn.logger.520 WithField(common.TagInPutAckID, common.FmtInPutAckID(ack.GetID())).521 WithField(common.TagErr, err).Error(`inputhost: writing ack Id failed`)522 }523 exists = conn.updateInflightMap(inflightMessages, ack.GetID(), ackReceiveTime)524 // update the failure metric, if needed525 if ack.GetStatus() == cherami.Status_FAILED || err != nil {526 conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageFailures)527 conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageFailures)528 }529 if err == nil {530 switch ack.GetStatus() {531 case cherami.Status_OK:532 conn.sentAcks++533 conn.pathCache.dstMetrics.Increment(load.DstMetricNumAcks)534 case cherami.Status_FAILED:535 conn.sentNacks++536 conn.pathCache.dstMetrics.Increment(load.DstMetricNumNacks)537 case cherami.Status_THROTTLED:538 conn.sentThrottled++539 conn.pathCache.dstMetrics.Increment(load.DstMetricNumThrottled)540 }541 }542 return543}544func (conn *pubConnection) updateInflightMap(inflightMessages map[string]response, ackID string, ackReceiveTime time.Time) bool {545 if resp, ok := inflightMessages[ackID]; ok {546 // record the latency547 d := time.Since(resp.putMsgRecvTime)548 conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, d)549 conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, d)550 conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime))551 conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime))552 if d > logHighLatencyThreshold {553 if logThrottler.Allow() {554 conn.logger.WithFields(bark.Fields{555 common.TagDstPth: common.FmtDstPth(conn.destinationPath),556 common.TagInPutAckID: common.FmtInPutAckID(ackID),557 `d`: d,558 `putMsgChanLen`: len(conn.putMsgCh),559 `putMsgAckChanLen`: len(conn.ackChannel),560 `replyChanLen`: len(conn.replyCh),561 `throttled`: atomic.SwapInt32(&logsThrottled, 0),562 }).Info(`publish message latency at updateInflightMap`)563 } else {564 atomic.AddInt32(&logsThrottled, 1)565 }566 }567 delete(inflightMessages, ackID)568 return ok569 }570 // didn't find it in the inflight map, which means we got an ack even before we populated it571 return false572}573func (conn *pubConnection) updateEarlyReplyAcks(resCh response, earlyReplyAcks map[string]earlyReplyAck) {574 // make sure we account for the time when we sent the ack as well575 d := time.Since(resCh.putMsgRecvTime)576 ack, _ := earlyReplyAcks[resCh.ackID]577 actualDuration := d - time.Since(ack.ackSentTime)578 if d > logHighLatencyThreshold {579 if logThrottler.Allow() {580 conn.logger.WithFields(bark.Fields{581 common.TagDstPth: common.FmtDstPth(conn.destinationPath),582 common.TagInPutAckID: common.FmtInPutAckID(resCh.ackID),583 `d`: d,584 `actualDuration`: actualDuration,585 `putMsgChanLen`: len(conn.putMsgCh),586 `putMsgAckChanLen`: len(conn.ackChannel),587 `replyChanLen`: len(conn.replyCh),588 `throttled`: atomic.SwapInt32(&logsThrottled, 0),589 }).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`)590 } else {591 atomic.AddInt32(&logsThrottled, 1)592 }593 }594 conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, actualDuration)595 conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, actualDuration)596 // also record the time that excludes the time for sending ack back to socket597 actualDurationExcludeSocket := d - time.Since(ack.ackReceiveTime)598 conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageBeforeAckLatency, actualDurationExcludeSocket)599 conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, actualDurationExcludeSocket)600 delete(earlyReplyAcks, resCh.ackID)601 // XXX: Disabled due to log noise602 // conn.logger.WithField(common.TagInPutAckID, common.FmtInPutAckID(resCh.ackID)).Info("Found ack for this response in earlyReplyAcks map. Not adding it to the inflight map")603}604func createReconfigureCmd(rInfo *reconfigInfo) *cherami.InputHostCommand {605 cmd := cherami.NewInputHostCommand()606 cmd.Reconfigure = &cherami.ReconfigureInfo{UpdateUUID: common.StringPtr(rInfo.updateUUID)}607 cmd.Type = common.CheramiInputHostCommandTypePtr(rInfo.cmdType)608 return cmd609}610func createAckCmd(ack *cherami.PutMessageAck) *cherami.InputHostCommand {611 cmd := cherami.NewInputHostCommand()612 cmd.Ack = ack613 cmd.Type = common.CheramiInputHostCommandTypePtr(cherami.InputHostCommandType_ACK)614 return cmd615}616// GetMsgsLimitPerSecond gets msgs rate limit per second for this connection617func (conn *pubConnection) GetMsgsLimitPerSecond() int {618 return int(atomic.LoadInt32(&conn.connMsgsLimitPerSecond))619}620// SetMsgsLimitPerSecond sets msgs rate limit per second for this connection621func (conn *pubConnection) SetMsgsLimitPerSecond(connLimit int32) {622 atomic.StoreInt32(&conn.connMsgsLimitPerSecond, connLimit)623 conn.SetConnTokenBucketValue(int32(connLimit))624}625// GetConnTokenBucketValue gets token bucket for connMsgsLimitPerSecond626func (conn *pubConnection) GetConnTokenBucketValue() common.TokenBucket {627 return conn.connTokenBucketValue.Load().(common.TokenBucket)...

Full Screen

Full Screen

slice_test.go

Source:slice_test.go Github

copy

Full Screen

1package pool2import (3 "context"4 "io"5 "testing"6 "time"7 xtime "github.com/zhangjinglei/wahaha/pkg/time"8 "github.com/stretchr/testify/assert"9)10type closer struct {11}12func (c *closer) Close() error {13 return nil14}15type connection struct {16 c io.Closer17 pool Pool18}19func (c *connection) HandleQuick() {20 // time.Sleep(1 * time.Millisecond)21}22func (c *connection) HandleNormal() {23 time.Sleep(20 * time.Millisecond)24}25func (c *connection) HandleSlow() {26 time.Sleep(500 * time.Millisecond)27}28func (c *connection) Close() {29 c.pool.Put(context.Background(), c.c, false)30}31func TestSliceGetPut(t *testing.T) {32 // new pool33 config := &Config{34 Active: 1,35 Idle: 1,36 IdleTimeout: xtime.Duration(90 * time.Second),37 WaitTimeout: xtime.Duration(10 * time.Millisecond),38 Wait: false,39 }40 pool := NewSlice(config)41 pool.New = func(ctx context.Context) (io.Closer, error) {42 return &closer{}, nil43 }44 // test Get Put45 conn, err := pool.Get(context.TODO())46 assert.Nil(t, err)47 c1 := connection{pool: pool, c: conn}48 c1.HandleNormal()49 c1.Close()50}51func TestSlicePut(t *testing.T) {52 var id = 053 type connID struct {54 io.Closer55 id int56 }57 config := &Config{58 Active: 1,59 Idle: 1,60 IdleTimeout: xtime.Duration(1 * time.Second),61 // WaitTimeout: xtime.Duration(10 * time.Millisecond),62 Wait: false,63 }64 pool := NewSlice(config)65 pool.New = func(ctx context.Context) (io.Closer, error) {66 id = id + 167 return &connID{id: id, Closer: &closer{}}, nil68 }69 // test Put(ctx, conn, true)70 conn, err := pool.Get(context.TODO())71 assert.Nil(t, err)72 conn1 := conn.(*connID)73 // Put(ctx, conn, true) drop the connection.74 pool.Put(context.TODO(), conn, true)75 conn, err = pool.Get(context.TODO())76 assert.Nil(t, err)77 conn2 := conn.(*connID)78 assert.NotEqual(t, conn1.id, conn2.id)79}80func TestSliceIdleTimeout(t *testing.T) {81 var id = 082 type connID struct {83 io.Closer84 id int85 }86 config := &Config{87 Active: 1,88 Idle: 1,89 // conn timeout90 IdleTimeout: xtime.Duration(1 * time.Millisecond),91 }92 pool := NewSlice(config)93 pool.New = func(ctx context.Context) (io.Closer, error) {94 id = id + 195 return &connID{id: id, Closer: &closer{}}, nil96 }97 // test Put(ctx, conn, true)98 conn, err := pool.Get(context.TODO())99 assert.Nil(t, err)100 conn1 := conn.(*connID)101 // Put(ctx, conn, true) drop the connection.102 pool.Put(context.TODO(), conn, false)103 time.Sleep(5 * time.Millisecond)104 // idletimeout and get new conn105 conn, err = pool.Get(context.TODO())106 assert.Nil(t, err)107 conn2 := conn.(*connID)108 assert.NotEqual(t, conn1.id, conn2.id)109}110func TestSliceContextTimeout(t *testing.T) {111 // new pool112 config := &Config{113 Active: 1,114 Idle: 1,115 IdleTimeout: xtime.Duration(90 * time.Second),116 WaitTimeout: xtime.Duration(10 * time.Millisecond),117 Wait: false,118 }119 pool := NewSlice(config)120 pool.New = func(ctx context.Context) (io.Closer, error) {121 return &closer{}, nil122 }123 // test context timeout124 ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)125 defer cancel()126 conn, err := pool.Get(ctx)127 assert.Nil(t, err)128 _, err = pool.Get(ctx)129 // context timeout error130 assert.NotNil(t, err)131 pool.Put(context.TODO(), conn, false)132 _, err = pool.Get(ctx)133 assert.Nil(t, err)134}135func TestSlicePoolExhausted(t *testing.T) {136 // test pool exhausted137 config := &Config{138 Active: 1,139 Idle: 1,140 IdleTimeout: xtime.Duration(90 * time.Second),141 // WaitTimeout: xtime.Duration(10 * time.Millisecond),142 Wait: false,143 }144 pool := NewSlice(config)145 pool.New = func(ctx context.Context) (io.Closer, error) {146 return &closer{}, nil147 }148 ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)149 defer cancel()150 conn, err := pool.Get(context.TODO())151 assert.Nil(t, err)152 _, err = pool.Get(ctx)153 // config active == 1, so no avaliable conns make connection exhausted.154 assert.NotNil(t, err)155 pool.Put(context.TODO(), conn, false)156 _, err = pool.Get(ctx)157 assert.Nil(t, err)158}159func TestSliceStaleClean(t *testing.T) {160 var id = 0161 type connID struct {162 io.Closer163 id int164 }165 config := &Config{166 Active: 1,167 Idle: 1,168 IdleTimeout: xtime.Duration(1 * time.Second),169 // WaitTimeout: xtime.Duration(10 * time.Millisecond),170 Wait: false,171 }172 pool := NewList(config)173 pool.New = func(ctx context.Context) (io.Closer, error) {174 id = id + 1175 return &connID{id: id, Closer: &closer{}}, nil176 }177 conn, err := pool.Get(context.TODO())178 assert.Nil(t, err)179 conn1 := conn.(*connID)180 pool.Put(context.TODO(), conn, false)181 conn, err = pool.Get(context.TODO())182 assert.Nil(t, err)183 conn2 := conn.(*connID)184 assert.Equal(t, conn1.id, conn2.id)185 pool.Put(context.TODO(), conn, false)186 // sleep more than idleTimeout187 time.Sleep(2 * time.Second)188 conn, err = pool.Get(context.TODO())189 assert.Nil(t, err)190 conn3 := conn.(*connID)191 assert.NotEqual(t, conn1.id, conn3.id)192}193func BenchmarkSlice1(b *testing.B) {194 config := &Config{195 Active: 30,196 Idle: 30,197 IdleTimeout: xtime.Duration(90 * time.Second),198 WaitTimeout: xtime.Duration(10 * time.Millisecond),199 Wait: false,200 }201 pool := NewSlice(config)202 pool.New = func(ctx context.Context) (io.Closer, error) {203 return &closer{}, nil204 }205 b.ResetTimer()206 b.RunParallel(func(pb *testing.PB) {207 for pb.Next() {208 conn, err := pool.Get(context.TODO())209 if err != nil {210 b.Error(err)211 continue212 }213 c1 := connection{pool: pool, c: conn}214 c1.HandleQuick()215 c1.Close()216 }217 })218}219func BenchmarkSlice2(b *testing.B) {220 config := &Config{221 Active: 30,222 Idle: 30,223 IdleTimeout: xtime.Duration(90 * time.Second),224 WaitTimeout: xtime.Duration(10 * time.Millisecond),225 Wait: false,226 }227 pool := NewSlice(config)228 pool.New = func(ctx context.Context) (io.Closer, error) {229 return &closer{}, nil230 }231 b.ResetTimer()232 b.RunParallel(func(pb *testing.PB) {233 for pb.Next() {234 conn, err := pool.Get(context.TODO())235 if err != nil {236 b.Error(err)237 continue238 }239 c1 := connection{pool: pool, c: conn}240 c1.HandleNormal()241 c1.Close()242 }243 })244}245func BenchmarkSlice3(b *testing.B) {246 config := &Config{247 Active: 30,248 Idle: 30,249 IdleTimeout: xtime.Duration(90 * time.Second),250 WaitTimeout: xtime.Duration(10 * time.Millisecond),251 Wait: false,252 }253 pool := NewSlice(config)254 pool.New = func(ctx context.Context) (io.Closer, error) {255 return &closer{}, nil256 }257 b.ResetTimer()258 b.RunParallel(func(pb *testing.PB) {259 for pb.Next() {260 conn, err := pool.Get(context.TODO())261 if err != nil {262 b.Error(err)263 continue264 }265 c1 := connection{pool: pool, c: conn}266 c1.HandleSlow()267 c1.Close()268 }269 })270}271func BenchmarkSlice4(b *testing.B) {272 config := &Config{273 Active: 30,274 Idle: 30,275 IdleTimeout: xtime.Duration(90 * time.Second),276 // WaitTimeout: xtime.Duration(10 * time.Millisecond),277 Wait: false,278 }279 pool := NewSlice(config)280 pool.New = func(ctx context.Context) (io.Closer, error) {281 return &closer{}, nil282 }283 b.ResetTimer()284 b.RunParallel(func(pb *testing.PB) {285 for pb.Next() {286 conn, err := pool.Get(context.TODO())287 if err != nil {288 b.Error(err)289 continue290 }291 c1 := connection{pool: pool, c: conn}292 c1.HandleSlow()293 c1.Close()294 }295 })296}297func BenchmarkSlice5(b *testing.B) {298 config := &Config{299 Active: 30,300 Idle: 30,301 IdleTimeout: xtime.Duration(90 * time.Second),302 // WaitTimeout: xtime.Duration(10 * time.Millisecond),303 Wait: true,304 }305 pool := NewSlice(config)306 pool.New = func(ctx context.Context) (io.Closer, error) {307 return &closer{}, nil308 }309 b.ResetTimer()310 b.RunParallel(func(pb *testing.PB) {311 for pb.Next() {312 conn, err := pool.Get(context.TODO())313 if err != nil {314 b.Error(err)315 continue316 }317 c1 := connection{pool: pool, c: conn}318 c1.HandleSlow()319 c1.Close()320 }321 })322}...

Full Screen

Full Screen

list_test.go

Source:list_test.go Github

copy

Full Screen

1package pool2import (3 "context"4 "io"5 "testing"6 "time"7 xtime "github.com/zhangjinglei/wahaha/pkg/time"8 "github.com/stretchr/testify/assert"9)10func TestListGetPut(t *testing.T) {11 // new pool12 config := &Config{13 Active: 1,14 Idle: 1,15 IdleTimeout: xtime.Duration(90 * time.Second),16 WaitTimeout: xtime.Duration(10 * time.Millisecond),17 Wait: false,18 }19 pool := NewList(config)20 pool.New = func(ctx context.Context) (io.Closer, error) {21 return &closer{}, nil22 }23 // test Get Put24 conn, err := pool.Get(context.TODO())25 assert.Nil(t, err)26 c1 := connection{pool: pool, c: conn}27 c1.HandleNormal()28 c1.Close()29}30func TestListPut(t *testing.T) {31 var id = 032 type connID struct {33 io.Closer34 id int35 }36 config := &Config{37 Active: 1,38 Idle: 1,39 IdleTimeout: xtime.Duration(1 * time.Second),40 // WaitTimeout: xtime.Duration(10 * time.Millisecond),41 Wait: false,42 }43 pool := NewList(config)44 pool.New = func(ctx context.Context) (io.Closer, error) {45 id = id + 146 return &connID{id: id, Closer: &closer{}}, nil47 }48 // test Put(ctx, conn, true)49 conn, err := pool.Get(context.TODO())50 assert.Nil(t, err)51 conn1 := conn.(*connID)52 // Put(ctx, conn, true) drop the connection.53 pool.Put(context.TODO(), conn, true)54 conn, err = pool.Get(context.TODO())55 assert.Nil(t, err)56 conn2 := conn.(*connID)57 assert.NotEqual(t, conn1.id, conn2.id)58}59func TestListIdleTimeout(t *testing.T) {60 var id = 061 type connID struct {62 io.Closer63 id int64 }65 config := &Config{66 Active: 1,67 Idle: 1,68 // conn timeout69 IdleTimeout: xtime.Duration(1 * time.Millisecond),70 }71 pool := NewList(config)72 pool.New = func(ctx context.Context) (io.Closer, error) {73 id = id + 174 return &connID{id: id, Closer: &closer{}}, nil75 }76 // test Put(ctx, conn, true)77 conn, err := pool.Get(context.TODO())78 assert.Nil(t, err)79 conn1 := conn.(*connID)80 // Put(ctx, conn, true) drop the connection.81 pool.Put(context.TODO(), conn, false)82 time.Sleep(5 * time.Millisecond)83 // idletimeout and get new conn84 conn, err = pool.Get(context.TODO())85 assert.Nil(t, err)86 conn2 := conn.(*connID)87 assert.NotEqual(t, conn1.id, conn2.id)88}89func TestListContextTimeout(t *testing.T) {90 // new pool91 config := &Config{92 Active: 1,93 Idle: 1,94 IdleTimeout: xtime.Duration(90 * time.Second),95 WaitTimeout: xtime.Duration(10 * time.Millisecond),96 Wait: false,97 }98 pool := NewList(config)99 pool.New = func(ctx context.Context) (io.Closer, error) {100 return &closer{}, nil101 }102 // test context timeout103 ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)104 defer cancel()105 conn, err := pool.Get(ctx)106 assert.Nil(t, err)107 _, err = pool.Get(ctx)108 // context timeout error109 assert.NotNil(t, err)110 pool.Put(context.TODO(), conn, false)111 _, err = pool.Get(ctx)112 assert.Nil(t, err)113}114func TestListPoolExhausted(t *testing.T) {115 // test pool exhausted116 config := &Config{117 Active: 1,118 Idle: 1,119 IdleTimeout: xtime.Duration(90 * time.Second),120 // WaitTimeout: xtime.Duration(10 * time.Millisecond),121 Wait: false,122 }123 pool := NewList(config)124 pool.New = func(ctx context.Context) (io.Closer, error) {125 return &closer{}, nil126 }127 ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)128 defer cancel()129 conn, err := pool.Get(context.TODO())130 assert.Nil(t, err)131 _, err = pool.Get(ctx)132 // config active == 1, so no avaliable conns make connection exhausted.133 assert.NotNil(t, err)134 pool.Put(context.TODO(), conn, false)135 _, err = pool.Get(ctx)136 assert.Nil(t, err)137}138func TestListStaleClean(t *testing.T) {139 var id = 0140 type connID struct {141 io.Closer142 id int143 }144 config := &Config{145 Active: 1,146 Idle: 1,147 IdleTimeout: xtime.Duration(1 * time.Second),148 // WaitTimeout: xtime.Duration(10 * time.Millisecond),149 Wait: false,150 }151 pool := NewList(config)152 pool.New = func(ctx context.Context) (io.Closer, error) {153 id = id + 1154 return &connID{id: id, Closer: &closer{}}, nil155 }156 conn, err := pool.Get(context.TODO())157 assert.Nil(t, err)158 conn1 := conn.(*connID)159 pool.Put(context.TODO(), conn, false)160 conn, err = pool.Get(context.TODO())161 assert.Nil(t, err)162 conn2 := conn.(*connID)163 assert.Equal(t, conn1.id, conn2.id)164 pool.Put(context.TODO(), conn, false)165 // sleep more than idleTimeout166 time.Sleep(2 * time.Second)167 conn, err = pool.Get(context.TODO())168 assert.Nil(t, err)169 conn3 := conn.(*connID)170 assert.NotEqual(t, conn1.id, conn3.id)171}172func BenchmarkList1(b *testing.B) {173 config := &Config{174 Active: 30,175 Idle: 30,176 IdleTimeout: xtime.Duration(90 * time.Second),177 WaitTimeout: xtime.Duration(10 * time.Millisecond),178 Wait: false,179 }180 pool := NewList(config)181 pool.New = func(ctx context.Context) (io.Closer, error) {182 return &closer{}, nil183 }184 b.ResetTimer()185 b.RunParallel(func(pb *testing.PB) {186 for pb.Next() {187 conn, err := pool.Get(context.TODO())188 if err != nil {189 b.Error(err)190 continue191 }192 c1 := connection{pool: pool, c: conn}193 c1.HandleQuick()194 c1.Close()195 }196 })197}198func BenchmarkList2(b *testing.B) {199 config := &Config{200 Active: 30,201 Idle: 30,202 IdleTimeout: xtime.Duration(90 * time.Second),203 WaitTimeout: xtime.Duration(10 * time.Millisecond),204 Wait: false,205 }206 pool := NewList(config)207 pool.New = func(ctx context.Context) (io.Closer, error) {208 return &closer{}, nil209 }210 b.ResetTimer()211 b.RunParallel(func(pb *testing.PB) {212 for pb.Next() {213 conn, err := pool.Get(context.TODO())214 if err != nil {215 b.Error(err)216 continue217 }218 c1 := connection{pool: pool, c: conn}219 c1.HandleNormal()220 c1.Close()221 }222 })223}224func BenchmarkPool3(b *testing.B) {225 config := &Config{226 Active: 30,227 Idle: 30,228 IdleTimeout: xtime.Duration(90 * time.Second),229 WaitTimeout: xtime.Duration(10 * time.Millisecond),230 Wait: false,231 }232 pool := NewList(config)233 pool.New = func(ctx context.Context) (io.Closer, error) {234 return &closer{}, nil235 }236 b.ResetTimer()237 b.RunParallel(func(pb *testing.PB) {238 for pb.Next() {239 conn, err := pool.Get(context.TODO())240 if err != nil {241 b.Error(err)242 continue243 }244 c1 := connection{pool: pool, c: conn}245 c1.HandleSlow()246 c1.Close()247 }248 })249}250func BenchmarkList4(b *testing.B) {251 config := &Config{252 Active: 30,253 Idle: 30,254 IdleTimeout: xtime.Duration(90 * time.Second),255 // WaitTimeout: xtime.Duration(10 * time.Millisecond),256 Wait: false,257 }258 pool := NewList(config)259 pool.New = func(ctx context.Context) (io.Closer, error) {260 return &closer{}, nil261 }262 b.ResetTimer()263 b.RunParallel(func(pb *testing.PB) {264 for pb.Next() {265 conn, err := pool.Get(context.TODO())266 if err != nil {267 b.Error(err)268 continue269 }270 c1 := connection{pool: pool, c: conn}271 c1.HandleSlow()272 c1.Close()273 }274 })275}276func BenchmarkList5(b *testing.B) {277 config := &Config{278 Active: 30,279 Idle: 30,280 IdleTimeout: xtime.Duration(90 * time.Second),281 // WaitTimeout: xtime.Duration(10 * time.Millisecond),282 Wait: true,283 }284 pool := NewList(config)285 pool.New = func(ctx context.Context) (io.Closer, error) {286 return &closer{}, nil287 }288 b.ResetTimer()289 b.RunParallel(func(pb *testing.PB) {290 for pb.Next() {291 conn, err := pool.Get(context.TODO())292 if err != nil {293 b.Error(err)294 continue295 }296 c1 := connection{pool: pool, c: conn}297 c1.HandleSlow()298 c1.Close()299 }300 })301}...

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 m := messenger.New(messenger.Options{})4 m.Start()5 defer m.Stop()6 msg := proton.NewMessage()7 msg.SetSubject("Hello")8 msg.SetBody("World")9 m.Put(msg)10 m.Send()11}12import (13func main() {14 m := messenger.New(messenger.Options{})15 m.Start()16 defer m.Stop()17 m.Receive()18 msg := m.Get()19 fmt.Println(msg.Body())20}21import (22func main() {23 m := messenger.New(messenger.Options{})24 m.Start()25 defer m.Stop()26 msg := proton.NewMessage()27 msg.SetSubject("Hello")28 msg.SetBody("World")29 m.Put(msg)30 m.Send()31}32import (33func main() {34 m := messenger.New(messenger.Options{})35 m.Start()36 defer m.Stop()37 m.Receive()

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 kt := keytab.New()4 err := kt.Unmarshal(testdata.TEST2_KEYTAB)5 if err != nil {6 log.Fatal(err)7 }8 clconf, err := config.NewFromString(testdata.TEST2_KRB5CONF)9 if err != nil {10 log.Fatal(err)11 }12 clcreds, err := credentials.LoadCCache(testdata.TEST2_CCACHE)13 if err != nil {14 log.Fatal(err)15 }16 cl := client.NewClientWithKeytab("testuser2", "TEST.GOKRB5", kt, clconf)17 cl.SetCredentials(clcreds)18 err = cl.GetServiceTicket("HTTP/test.gokrb5")19 if err != nil {20 log.Fatal(err)21 }22 svc := service.NewService("HTTP/test.gokrb5", testdata2.TEST2_KEYTAB, testdata2.TEST2_KRB5CONF)23 _, err = svc.Accept(clcreds)24 if err != nil {25 log.Fatal(err)26 }27 conn, err := svc.NewConnection(clcreds)28 if err != nil {29 log.Fatal(err)30 }31 err = conn.Put([]byte("Hello world"))32 if err != nil {33 log.Fatal(err)34 }35 b, err := conn.Get()36 if err != nil {

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 client := redis.NewClient(&redis.Options{4 })5 pong, err := client.Ping().Result()6 fmt.Println(pong, err)7}8import (9func main() {10 client := redis.NewClient(&redis.Options{11 })12 pong, err := client.Ping().Result()13 fmt.Println(pong, err)14}15import (16func main() {17 client := redis.NewClient(&redis.Options{18 })19 pong, err := client.Ping().Result()20 fmt.Println(pong, err)21}22import (23func main() {24 client := redis.NewClient(&redis.Options{25 })26 pong, err := client.Ping().Result()27 fmt.Println(pong, err)28}29import (30func main() {31 client := redis.NewClient(&redis.Options{32 })33 pong, err := client.Ping().Result()34 fmt.Println(pong, err)35}36import (

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 fmt.Println(err)5 }6 client := &http.Client{}7 resp, err := client.Do(req)8 if err != nil {9 fmt.Println(err)10 }11 fmt.Println(resp)12}13Content-Type: text/plain; charset=utf-8

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2type Args struct {3}4type Quotient struct {5}6func (t *Arith) Multiply(args *Args, reply *int) error {7}8func (t *Arith) Divide(args *Args, quo *Quotient) error {9 if args.B == 0 {10 return fmt.Errorf("divide by zero")11 }12}13func main() {14 arith := new(Arith)15 rpc.Register(arith)16 rpc.HandleHTTP()17 l, e := net.Listen("tcp", ":1234")18 if e != nil {19 log.Fatal("listen error:", e)20 }21 go http.Serve(l, nil)22 client, err := jsonrpc.Dial("tcp", "localhost:1234")23 if err != nil {24 log.Fatal("dialing:", err)25 }26 args := &Args{7, 8}27 err = client.Call("Arith.Multiply", args, &reply)28 if err != nil {29 log.Fatal("arith error:", err)30 }31 fmt.Printf("Arith: %d*%d=%d32 quot := new(Quotient)33 err = client.Call("Arith.Divide", args, &quot)34 if err != nil {35 log.Fatal("arith error:", err)36 }37 fmt.Printf("Arith: %d/%d=%d remainder %d38}

Full Screen

Full Screen

put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println("Starting the application...")4 failOnError(err, "Failed to connect to RabbitMQ")5 defer conn.Close()6 ch, err := conn.Channel()7 failOnError(err, "Failed to open a channel")8 defer ch.Close()9 q, err := ch.QueueDeclare(10 failOnError(err, "Failed to declare a queue")11 body := bodyFrom(os.Args)12 err = ch.Publish(13 amqp.Publishing{14 Body: []byte(body),15 })16 failOnError(err, "Failed to publish a message")17 fmt.Println(" [x] Sent %s", body)18}19func failOnError(err error, msg string) {20 if err != nil {21 fmt.Println("%s: %s", msg, err)22 }23}24func bodyFrom(args []string) string {25 if (len(args) < 2) || os.Args[1] == "" {26 } else {27 s = strings.Join(args[1:], " ")28 }29}30import (31func main() {32 fmt.Println("Starting the application...")33 failOnError(err, "Failed to connect to RabbitMQ")34 defer conn.Close()35 ch, err := conn.Channel()36 failOnError(err, "Failed to open a channel")37 defer ch.Close()38 q, err := ch.QueueDeclare(

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful