How to use GetFailure method of internal Package

Best Ginkgo code snippet using internal.GetFailure

server.go

Source:server.go Github

copy

Full Screen

1// Copyright (c) 2015 Western Digital Corporation or its affiliates. All rights reserved.2// SPDX-License-Identifier: MIT3package tractserver4import (5 "context"6 "errors"7 "net/http"8 "sync"9 "time"10 log "github.com/golang/glog"11 "github.com/westerndigitalcorporation/blb/internal/core"12 "github.com/westerndigitalcorporation/blb/internal/server"13 "github.com/westerndigitalcorporation/blb/pkg/failures"14 "github.com/westerndigitalcorporation/blb/pkg/rpc"15)16type curatorInfo struct {17 // These may be stale; a curator in one repl group could be scheduled on a machine,18 // die, get scheduled elsewhere, and a curator in a diff. repl group could be brought19 // up on that same machine.20 partitions []core.PartitionID21}22// Server is the RPC server for the TractStore23type Server struct {24 // The actual data storage.25 store *Store26 // Configuration parameters.27 cfg *Config28 // Aggregate connections to curators.29 ct CuratorTalker30 // The set of curators (indexed by address) that we heartbeat to.31 curators map[string]curatorInfo32 // Cached disk status.33 fsStatusCache []core.FsStatus34 // When was the cached disk status last updated.35 fsStatusCacheUpdated time.Time36 // Lock for 'curators', 'fsStatus' and 'fsStatusUpdated'.37 lock sync.Mutex38 // The connection to master.39 mc *RPCMasterConnection40 // Control handler.41 ctlHandler *TSCtlHandler42 // Service handler.43 srvHandler *TSSrvHandler44}45// NewServer creates a new Server. The server does not listen for or serve46// requests until Start() is called on it.47func NewServer(store *Store, ct CuratorTalker, mc *RPCMasterConnection, cfg *Config) *Server {48 return &Server{49 store: store,50 ct: ct,51 cfg: cfg,52 curators: make(map[string]curatorInfo),53 mc: mc,54 }55}56// Start starts the TractServer by launching goroutines to accept RPC requests.57func (s *Server) Start() (err error) {58 s.initTractserverID()59 // Set up status page.60 http.HandleFunc("/", s.statusHandler)61 //http.HandleFunc("/logs", health.HandleLogs)62 //http.HandleFunc("/loglevel", health.HandleLogsLevel)63 // Endpoint for shutting down the tractserver.64 http.HandleFunc("/_quit", server.QuitHandler)65 // Create control/service handlers.66 opm := server.NewOpMetric("tractserver_rpc", "rpc")67 s.ctlHandler = newTSCtlHandler(s, opm)68 s.srvHandler = newTSSrvHandler(s, opm)69 // Register the rpc handlers.70 if err = rpc.RegisterName("TSCtlHandler", s.ctlHandler); nil != err {71 return err72 }73 if err = rpc.RegisterName("TSSrvHandler", s.srvHandler); err != nil {74 return err75 }76 go s.masterHeartbeatLoop()77 go s.curatorHeartbeatLoop()78 log.Infof("tractserver id=%v listening on address %s", s.store.GetID(), s.cfg.Addr)79 err = http.ListenAndServe(s.cfg.Addr, nil) // this blocks forever80 log.Fatalf("http listener returned error: %v", err)81 return82}83// initTractserverID contacts the master to register this tractserver for service.84func (s *Server) initTractserverID() core.TractserverID {85 // We just started up and probably don't have any disks yet. Let's wait86 // until we add some disks.87 for seconds := 0; s.store.DiskCount() == 0; seconds++ {88 if seconds%60 == 3 {89 log.Infof("Waiting for disks to read TSID...")90 }91 time.Sleep(time.Second)92 }93 // Already registered.94 if id := s.store.GetID(); id.IsValid() {95 return id96 }97 // Check for manual override.98 if overrideID := core.TractserverID(s.cfg.OverrideID); overrideID.IsValid() {99 log.Infof("tractserver ID is being **OVERRIDDEN** to %d", overrideID)100 if _, err := s.store.SetID(overrideID); err != core.NoError {101 log.Fatalf("error storing OVERRIDDEN tractserver ID: %s", err)102 }103 return overrideID104 }105 // We haven't registered yet. Register with the master.106 for {107 reply, err := s.mc.RegisterTractserver(context.Background(), s.cfg.Addr)108 if err != core.NoError {109 log.Errorf("initTractserverID: failed to register tractserver with master, sleeping and retrying, err=%s", err)110 time.Sleep(s.cfg.RegistrationRetry)111 continue112 }113 log.Infof("registered TSID %s", reply.TSID)114 if id, err := s.store.SetID(reply.TSID); err != core.NoError {115 log.Fatalf("failed to persist the tractserver ID: %s", err)116 } else {117 return id118 }119 }120}121// masterHeartbeatLoop runs forever, sending heartbeats to the master.122func (s *Server) masterHeartbeatLoop() {123 masterTicker := time.NewTicker(s.cfg.MasterHeartbeatInterval)124 for {125 // Send the regularly scheduled heartbeat.126 beat, err := s.mc.MasterTractserverHeartbeat(context.Background(), s.store.GetID(), s.cfg.Addr, s.getStatusForHeartbeat())127 if err != core.NoError {128 log.Errorf("masterHeartbeatLoop: error sending heartbeat to master, sleeping and retrying, err=%s", err)129 time.Sleep(s.cfg.MasterHeartbeatRetry)130 continue131 }132 s.processMasterHeartbeat(beat)133 <-masterTicker.C134 }135}136// processMasterHeartbeat compares our view of active curators with the master's view,137// establishing connections to new curators and closing connections to stale curators.138func (s *Server) processMasterHeartbeat(beat core.MasterTractserverHeartbeatReply) {139 // Collect some stats for logging.140 alreadyConnected := 0141 newConnections := 0142 removedConnections := 0143 // We create a map of what's in the from-master heartbeat so that we can remove any144 // curators that aren't in it later.145 inBeat := make(map[string]bool)146 load := s.getLoad()147 // Using s.curators so we lock it.148 s.lock.Lock()149 for _, addr := range beat.Curators {150 inBeat[addr] = true151 if _, ok := s.curators[addr]; ok {152 alreadyConnected++153 } else {154 s.curators[addr] = curatorInfo{}155 go s.beatToCurator(nil, addr, nil, load)156 newConnections++157 }158 }159 for addr := range s.curators {160 if !inBeat[addr] {161 go s.ct.Close(addr)162 delete(s.curators, addr)163 removedConnections++164 }165 }166 s.lock.Unlock()167 if newConnections+removedConnections > 0 {168 log.Infof("processMasterHeartbeat: %d already connected to, %d new connections, %d dropped, total %d curator conns",169 alreadyConnected, newConnections, removedConnections, alreadyConnected+newConnections)170 }171}172// getLoad returns information about how loaded this TS is.173func (s *Server) getLoad() core.TractserverLoad {174 var load core.TractserverLoad175 for _, s := range s.getStatus() {176 load.NumTracts += s.NumTracts177 load.TotalSpace += s.TotalSpace178 // A disk's FS might have free space but we can't use it if the disk is unhealthy.179 if s.Status.Healthy && !s.Status.Full {180 load.AvailSpace += s.AvailSpace181 }182 }183 return load184}185// curatorHeartbeatLoop runs forever, regularly sending heartbeats to all curators in s.curators.186func (s *Server) curatorHeartbeatLoop() {187 // Maps from a partition ID to tracts that we store for that partition.188 // Used to send a subset of tracts to the right curator.189 var tractsByPart map[core.PartitionID][]core.TractID190 var shard uint64191 for range time.Tick(s.cfg.CuratorHeartbeatInterval) {192 // pickSubset removes a subset of tractsByPart. When it's out of tracts we refresh it.193 if len(tractsByPart) == 0 {194 tractsByPart = s.store.GetSomeTractsByPartition(shard)195 shard++196 }197 // Send the same load vector to all curators.198 load := s.getLoad()199 // Protect s.curators as we iterate over it.200 s.lock.Lock()201 log.V(2).Infof("sending heartbeats to %d curators", len(s.curators))202 for c, ci := range s.curators {203 subset := s.pickSubset(ci.partitions, tractsByPart)204 go s.beatToCurator(ci.partitions, c, subset, load)205 }206 s.lock.Unlock()207 }208}209// Pick a subset of the tracts in 'all', up to tractsPerCuratorHeartbeat, that have a partition in 'parts'.210func (s *Server) pickSubset(parts []core.PartitionID, all map[core.PartitionID][]core.TractID) []core.TractID {211 var ret []core.TractID212 for _, p := range parts {213 budget := s.cfg.TractsPerCuratorHeartbeat - len(ret)214 if budget <= 0 {215 break216 } else if budget < len(all[p]) {217 ret = append(ret, all[p][0:budget]...)218 all[p] = all[p][budget:]219 } else {220 ret = append(ret, all[p]...)221 delete(all, p)222 }223 }224 return ret225}226// beatToCurator sents a heartbeat to the given curator.227func (s *Server) beatToCurator(parts []core.PartitionID, curatorAddr string, tracts []core.TractID, load core.TractserverLoad) {228 // Pull out what's corrupt and managed by this curator.229 beat := core.CuratorTractserverHeartbeatReq{230 TSID: s.store.GetID(),231 Addr: s.cfg.Addr,232 Corrupt: s.store.GetBadTracts(parts, s.cfg.BadTractsPerHeartbeat),233 Has: tracts,234 Load: load,235 }236 parts, err := s.ct.CuratorTractserverHeartbeat(curatorAddr, beat)237 if err != nil {238 log.Errorf("failed to beat to curator at %s, err=%s", curatorAddr, err)239 return240 }241 log.V(2).Infof("beat to curator at %s, sent %d corrupt, %d owned, got partitions %v",242 curatorAddr, len(beat.Corrupt), len(beat.Has), parts)243 // Update which partitions the curator owns and remove reported bad244 // tracts from failures map.245 s.lock.Lock()246 s.curators[curatorAddr] = curatorInfo{partitions: parts}247 s.store.removeTractsFromFailures(beat.Corrupt)248 s.lock.Unlock()249}250// getStatus retrieves disk status -- used by getLoad and status page. The251// caller should not mutate the returned slice.252func (s *Server) getStatus() []core.FsStatus {253 s.lock.Lock()254 defer s.lock.Unlock()255 // If the cached status is too stale, go down to the store layer to256 // retrieve the actual status and refresh the cache.257 if time.Now().After(s.fsStatusCacheUpdated.Add(s.cfg.DiskStatusCacheTTL)) {258 s.fsStatusCache = s.store.getStatus()259 s.fsStatusCacheUpdated = time.Now()260 }261 return s.fsStatusCache262}263// getStatusForHeartbeat is like getStatus, but clears some fields we don't need264// to send to the master.265func (s *Server) getStatusForHeartbeat() []core.FsStatus {266 disks := s.getStatus()267 out := make([]core.FsStatus, len(disks))268 for i, d := range disks {269 out[i] = d270 out[i].Ops = nil271 }272 return out273}274//-----------------275// Control handler276//-----------------277// TSCtlHandler handles all control messages.278type TSCtlHandler struct {279 // When failure service is enabled, what errors failed operations should return.280 opFailure *server.OpFailure281 // The server.282 server *Server283 // The actual data store.284 store *Store285 // The semaphore which is used to limit the number of pending requests.286 pendingSem server.Semaphore287 // Metrics we collect.288 opm *server.OpMetric289}290// newTSCtlHandler creates a new TSCtlHandler.291func newTSCtlHandler(s *Server, opm *server.OpMetric) *TSCtlHandler {292 cfg := s.cfg293 handler := &TSCtlHandler{294 server: s,295 store: s.store,296 pendingSem: server.NewSemaphore(cfg.RejectCtlReqThreshold),297 opm: opm,298 }299 if cfg.UseFailure {300 handler.opFailure = server.NewOpFailure()301 if err := failures.Register("ts_control_failure", handler.opFailure.Handler); err != nil {302 log.Errorf("failed to register failure service: %s", err)303 }304 }305 return handler306}307// SetVersion sets the version of a tract. This is done by the curator when it changes replication308// group membership.309func (h *TSCtlHandler) SetVersion(req core.SetVersionReq, reply *core.SetVersionReply) error {310 op := h.opm.Start("SetVersion")311 defer op.EndWithBlbError(&reply.Err)312 // Check failure service.313 if err := h.getFailure("SetVersion"); err != core.NoError {314 log.Errorf("SetVersion: failure service override, returning %s", err)315 *reply = core.SetVersionReply{Err: err}316 return nil317 }318 // Check pending request limit.319 if !h.pendingSem.TryAcquire() {320 op.TooBusy()321 log.Errorf("SetVersion: too busy, rejecting req")322 return errBusy323 }324 defer h.pendingSem.Release()325 if !h.store.HasID(req.TSID) {326 log.Infof("SetVersion: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())327 *reply = core.SetVersionReply{Err: core.ErrWrongTractserver}328 return nil329 }330 reply.NewVersion, reply.Err = h.store.SetVersion(req.ID, req.NewVersion, req.ConditionalStamp)331 log.Infof("SetVersion: req %+v reply %+v", req, *reply)332 return nil333}334// PullTract copies a tract from an existing tractserver.335func (h *TSCtlHandler) PullTract(req core.PullTractReq, reply *core.Error) error {336 op := h.opm.Start("PullTract")337 defer op.EndWithBlbError(reply)338 // Check failure service.339 if err := h.getFailure("PullTract"); err != core.NoError {340 log.Errorf("PullTract: failure service override, returning %s", err)341 *reply = err342 return nil343 }344 // Check pending request limit.345 if !h.pendingSem.TryAcquire() {346 op.TooBusy()347 log.Errorf("PullTract: too busy, rejecting req")348 return errBusy349 }350 defer h.pendingSem.Release()351 if !h.store.HasID(req.TSID) {352 log.Infof("PullTract: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())353 *reply = core.ErrWrongTractserver354 return nil355 }356 ctx := controlContext()357 *reply = h.store.PullTract(ctx, req.From, req.ID, req.Version)358 log.Infof("PullTract: req %+v reply %+v", req, *reply)359 return nil360}361// CheckTracts is a request from a curator to verify that we have the tracts the curator thinks we do.362func (h *TSCtlHandler) CheckTracts(req core.CheckTractsReq, reply *core.CheckTractsReply) error {363 op := h.opm.Start("CheckTracts")364 defer op.EndWithBlbError(&reply.Err)365 // Check failure service.366 if err := h.getFailure("CheckTracts"); err != core.NoError {367 log.Errorf("CheckTracts: failure service override, returning %s", err)368 *reply = core.CheckTractsReply{Err: err}369 return nil370 }371 // Check pending request limit.372 if !h.pendingSem.TryAcquire() {373 op.TooBusy()374 log.Errorf("CheckTracts: too busy, rejecting req")375 return errBusy376 }377 defer h.pendingSem.Release()378 if !h.store.HasID(req.TSID) {379 log.Infof("CheckTracts: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())380 *reply = core.CheckTractsReply{Err: core.ErrWrongTractserver}381 return nil382 }383 *reply = core.CheckTractsReply{Err: core.NoError}384 log.Infof("CheckTracts: req %d tracts reply %+v", len(req.Tracts), *reply)385 // Enqueue the check tracts request into the channel and the check will be386 // done by another goroutine.387 select {388 case h.store.checkTractsCh <- req.Tracts:389 case <-time.After(3 * time.Second):390 // Drop the check request after a certain timeout. This is fine given391 // curators will redo the check periodically.392 log.Errorf("Timeout on blocking on the check tracts channel, drop the CheckTracts request with %d tracts", len(req.Tracts))393 op.TooBusy()394 }395 return nil396}397// GCTract is a request to garbage collect the provided tracts as we don't need to store them anymore.398func (h *TSCtlHandler) GCTract(req core.GCTractReq, reply *core.Error) error {399 op := h.opm.Start("GCTract")400 defer op.EndWithBlbError(reply)401 if err := h.getFailure("GCTract"); err != core.NoError {402 log.Errorf("GCTract: failure service override, returning %s", err)403 *reply = err404 return nil405 }406 // Check pending request limit.407 if !h.pendingSem.TryAcquire() {408 op.TooBusy()409 log.Errorf("GCTract: too busy, rejecting req")410 return errBusy411 }412 defer h.pendingSem.Release()413 if !h.store.HasID(req.TSID) {414 log.Infof("GCTract: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())415 *reply = core.ErrWrongTractserver416 return nil417 }418 h.store.GCTracts(req.Old, req.Gone)419 *reply = core.NoError420 log.Infof("GCTract: req %+v reply %+v", req, *reply)421 return nil422}423// GetTSID returns the id of this tractserver. This is only used for testing424// purposes and not in production.425func (h *TSCtlHandler) GetTSID(req struct{}, reply *core.TractserverID) error {426 if id := h.store.GetID(); id.IsValid() {427 *reply = id428 return nil429 }430 return errors.New("unregistered tractserver")431}432// PackTracts instructs the tractserver to read a bunch of tracts from other433// tractservers and write them to one RS data chunk on the local tractserver.434func (h *TSCtlHandler) PackTracts(req core.PackTractsReq, reply *core.Error) error {435 op := h.opm.Start("PackTracts")436 defer op.EndWithBlbError(reply)437 if err := h.getFailure("PackTracts"); err != core.NoError {438 log.Errorf("PackTracts: failure service override, returning %s", err)439 *reply = err440 return nil441 }442 // Check pending request limit.443 if !h.pendingSem.TryAcquire() {444 op.TooBusy()445 log.Errorf("PackTracts: too busy, rejecting req")446 return errBusy447 }448 defer h.pendingSem.Release()449 if !h.store.HasID(req.TSID) {450 log.Infof("PackTracts: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())451 *reply = core.ErrWrongTractserver452 return nil453 }454 ctx := controlContext()455 *reply = h.store.PackTracts(ctx, req.Length, req.Tracts, req.ChunkID)456 log.Infof("PackTracts: req %+v reply %+v", req, *reply)457 return nil458}459// RSEncode instructs the tractserver to read a bunch of RS data chunks, perform460// the RS parity computation, and write out parity chunks to other tractservers.461func (h *TSCtlHandler) RSEncode(req core.RSEncodeReq, reply *core.Error) error {462 op := h.opm.Start("RSEncode")463 defer op.EndWithBlbError(reply)464 if err := h.getFailure("RSEncode"); err != core.NoError {465 log.Errorf("RSEncode: failure service override, returning %s", err)466 *reply = err467 return nil468 }469 // Check pending request limit.470 if !h.pendingSem.TryAcquire() {471 op.TooBusy()472 log.Errorf("RSEncode: too busy, rejecting req")473 return errBusy474 }475 defer h.pendingSem.Release()476 if !h.store.HasID(req.TSID) {477 log.Infof("RSEncode: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())478 *reply = core.ErrWrongTractserver479 return nil480 }481 ctx := controlContext()482 *reply = h.store.RSEncode(ctx, req.ChunkID, req.Length, req.Srcs, req.Dests, req.IndexMap)483 log.Infof("RSEncode: req %+v reply %+v", req, *reply)484 return nil485}486// CtlRead reads from a tract, bypassing request limits. CtlReads are not487// cancellable.488func (h *TSCtlHandler) CtlRead(req core.ReadReq, reply *core.ReadReply) error {489 op := h.opm.Start("CtlRead")490 defer op.EndWithBlbError(&reply.Err)491 if err := h.getFailure("CtlRead"); err != core.NoError {492 log.Errorf("CtlRead: failure service override, returning %s", err)493 *reply = core.ReadReply{Err: err}494 return nil495 }496 // Check pending request limit.497 if !h.pendingSem.TryAcquire() {498 op.TooBusy()499 log.Errorf("CtlRead: too busy, rejecting req")500 return errBusy501 }502 defer h.pendingSem.Release()503 ctx := controlContext()504 var b []byte505 b, reply.Err = h.store.Read(ctx, req.ID, req.Version, req.Len, req.Off)506 reply.Set(b, true)507 log.Infof("CtlRead: req %+v reply len %d Err %s", req, len(b), reply.Err)508 return nil509}510// CtlWrite does a write to a tract, bypassing request limits.511// Unlike regular Write, CtlWrite creates the tract when Off is zero.512func (h *TSCtlHandler) CtlWrite(req core.WriteReq, reply *core.Error) error {513 op := h.opm.Start("CtlWrite")514 defer op.EndWithBlbError(reply)515 if err := h.getFailure("CtlWrite"); err != core.NoError {516 log.Errorf("CtlWrite: failure service override, returning %s", err)517 *reply = err518 return nil519 }520 // Check pending request limit.521 if !h.pendingSem.TryAcquire() {522 op.TooBusy()523 log.Errorf("CtlWrite: too busy, rejecting req")524 return errBusy525 }526 defer h.pendingSem.Release()527 // Currently, we only support CtlWrite for RS chunks.528 if req.ID.Blob.Partition().Type() != core.RSPartition {529 *reply = core.ErrInvalidArgument530 return nil531 }532 // TODO: This feels a little messy. Is there a better way to do this?533 // (Without adding a whole new CtlCreate path?)534 ctx := controlContext()535 if req.Off == 0 {536 *reply = h.store.Create(ctx, req.ID, req.B, req.Off)537 } else {538 *reply = h.store.Write(ctx, req.ID, req.Version, req.B, req.Off)539 }540 lenB := len(req.B)541 rpc.PutBuffer(req.Get())542 log.Infof("CtlWrite: req ID %v Version %v len(B) %d, Off %d, reply %+v", req.ID, req.Version, lenB, req.Off, *reply)543 return nil544}545// CtlStatTract returns the size of a tract.546func (h *TSCtlHandler) CtlStatTract(req core.StatTractReq, reply *core.StatTractReply) error {547 op := h.opm.Start("CtlStatTract")548 defer op.EndWithBlbError(&reply.Err)549 // Check failure service.550 if err := h.getFailure("StatTract"); err != core.NoError {551 log.Errorf("StatTract: failure service override, returning %s", err)552 *reply = core.StatTractReply{Err: err}553 return nil554 }555 // Check pending request limit.556 if !h.pendingSem.TryAcquire() {557 op.TooBusy()558 log.Errorf("CtlStatTract: too busy, rejecting req")559 return errBusy560 }561 defer h.pendingSem.Release()562 ctx := controlContext()563 reply.Size, reply.ModStamp, reply.Err = h.store.Stat(ctx, req.ID, req.Version)564 log.Infof("CtlStatTract: req %+v reply %+v", req, *reply)565 return nil566}567func (h *TSCtlHandler) rpcStats() map[string]string {568 return h.opm.Strings(569 "CheckTracts",570 "GCTract",571 "PullTract",572 "SetVersion",573 "PackTracts",574 "RSEncode",575 "CtlRead",576 "CtlWrite",577 "CtlStatTract",578 )579}580// Return the error registered with the given operation 'op', if any.581func (h *TSCtlHandler) getFailure(op string) core.Error {582 if nil == h.opFailure {583 return core.NoError584 }585 return h.opFailure.Get(op)586}587//-----------------588// Service handler589//-----------------590// errBusy is returned if there are too many pending requests.591var errBusy = errors.New("the server is too busy to serve this request")592// TSSrvHandler handles all client requests.593type TSSrvHandler struct {594 // When failure service is enabled, what errors failed operations should return.595 opFailure *server.OpFailure596 // The server.597 server *Server598 // The actual data store.599 store *Store600 // The semaphore which is used to limit the number of pending requests.601 pendingSem server.Semaphore602 // Per-RPC info.603 opm *server.OpMetric604 inFlight *opTracker605}606// newTSSrvHandler creates a new TSSrvHandler.607func newTSSrvHandler(s *Server, opm *server.OpMetric) *TSSrvHandler {608 cfg := s.cfg609 handler := &TSSrvHandler{610 server: s,611 store: s.store,612 pendingSem: server.NewSemaphore(cfg.RejectReqThreshold),613 opm: opm,614 inFlight: newOpTracker(),615 }616 if cfg.UseFailure {617 handler.opFailure = server.NewOpFailure()618 if err := failures.Register("ts_service_failure", handler.opFailure.Handler); err != nil {619 log.Errorf("failed to register failure service: %s", err)620 }621 }622 return handler623}624// CreateTract creates a tract and writes data to it. Upon success, the tract625// can be further Read/Write'd, and will have version 1.626func (h *TSSrvHandler) CreateTract(req core.CreateTractReq, reply *core.Error) error {627 op := h.opm.Start("CreateTract")628 defer op.EndWithBlbError(reply)629 // Check failure service.630 if err := h.getFailure("CreateTract"); err != core.NoError {631 log.Errorf("Create: failure service override, returning %s", err)632 *reply = err633 return nil634 }635 // Check pending request limit.636 if !h.pendingSem.TryAcquire() {637 op.TooBusy()638 log.Errorf("CreateTract: too busy, rejecting req")639 return errBusy640 }641 defer h.pendingSem.Release()642 if !h.store.HasID(req.TSID) {643 log.Infof("CreateTract: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())644 *reply = core.ErrWrongTractserver645 return nil646 }647 ctx := contextWithPriority(context.Background(), mapPriority(req.Pri))648 *reply = h.store.Create(ctx, req.ID, req.B, req.Off)649 lenB := len(req.B)650 rpc.PutBuffer(req.Get())651 log.Infof("Create: req ID %v len(B) %d, Off %d, reply %+v", req.ID, lenB, req.Off, *reply)652 return nil653}654// Write does a write to a tract on this tractserver.655func (h *TSSrvHandler) Write(req core.WriteReq, reply *core.Error) error {656 op := h.opm.Start("Write")657 defer op.EndWithBlbError(reply)658 // Check failure service.659 if err := h.getFailure("Write"); err != core.NoError {660 log.Errorf("Write: failure service override, returning %s", err)661 *reply = err662 return nil663 }664 // Check pending request limit.665 if !h.pendingSem.TryAcquire() {666 op.TooBusy()667 log.Errorf("Write: too busy, rejecting req")668 return errBusy669 }670 defer h.pendingSem.Release()671 // Make sure the client can cancel this op by adding it to the in-flight table.672 ctx := h.inFlight.start(req.ReqID)673 if ctx == nil {674 log.Errorf("Write: new request w/existing ReqID, rejecting. req: %+v", req)675 return errBusy676 }677 defer h.inFlight.end(req.ReqID)678 ctx = contextWithPriority(ctx, mapPriority(req.Pri))679 *reply = h.store.Write(ctx, req.ID, req.Version, req.B, req.Off)680 lenB := len(req.B)681 rpc.PutBuffer(req.Get())682 log.Infof("Write: req ID %v Version %v len(B) %d, Off %d, reply %+v", req.ID, req.Version, lenB, req.Off, *reply)683 return nil684}685// Read reads from a tract.686func (h *TSSrvHandler) Read(req core.ReadReq, reply *core.ReadReply) error {687 op := h.opm.Start("Read")688 defer op.EndWithBlbError(&reply.Err)689 // Check failure service.690 if err := h.getFailure("Read"); err != core.NoError {691 log.Errorf("Read: failure service override, returning %s", err)692 *reply = core.ReadReply{Err: err}693 return nil694 }695 // Check pending request limit.696 if !h.pendingSem.TryAcquire() {697 op.TooBusy()698 log.Errorf("Read: too busy, rejecting req")699 return errBusy700 }701 defer h.pendingSem.Release()702 // Make sure the client can cancel this op by adding it to the in-flight table.703 ctx := h.inFlight.start(req.ReqID)704 if ctx == nil {705 log.Errorf("Read: new request w/existing ReqID, rejecting. req: %+v", req)706 return errBusy707 }708 defer h.inFlight.end(req.ReqID)709 // Actually do the op.710 ctx = contextWithPriority(ctx, mapPriority(req.Pri))711 var b []byte712 b, reply.Err = h.store.Read(ctx, req.ID, req.Version, req.Len, req.Off)713 reply.Set(b, true)714 log.Infof("Read: req %+v reply len %d Err %s", req, len(b), reply.Err)715 return nil716}717// StatTract returns the size of a tract.718func (h *TSSrvHandler) StatTract(req core.StatTractReq, reply *core.StatTractReply) error {719 op := h.opm.Start("StatTract")720 defer op.EndWithBlbError(&reply.Err)721 // Check failure service.722 if err := h.getFailure("StatTract"); err != core.NoError {723 log.Errorf("StatTract: failure service override, returning %s", err)724 *reply = core.StatTractReply{Err: err}725 return nil726 }727 // Check pending request limit.728 if !h.pendingSem.TryAcquire() {729 op.TooBusy()730 log.Errorf("StatTract: too busy, rejecting req")731 return errBusy732 }733 defer h.pendingSem.Release()734 ctx := contextWithPriority(context.Background(), mapPriority(req.Pri))735 reply.Size, reply.ModStamp, reply.Err = h.store.Stat(ctx, req.ID, req.Version)736 log.Infof("StatTract: req %+v reply %+v", req, *reply)737 return nil738}739// GetDiskInfo returns a summary of disk info in reply.740func (h *TSSrvHandler) GetDiskInfo(req core.GetDiskInfoReq, reply *core.GetDiskInfoReply) error {741 op := h.opm.Start("GetDiskInfo")742 defer op.EndWithBlbError(&reply.Err)743 // Check failure service.744 if err := h.getFailure("GetDiskInfo"); err != core.NoError {745 log.Errorf("GetDiskInfo: failure service override, returning %s", err)746 *reply = core.GetDiskInfoReply{Err: err}747 return nil748 }749 // Check pending request limit.750 if !h.pendingSem.TryAcquire() {751 op.TooBusy()752 log.Errorf("GetDiskInfo: too busy, rejecting req")753 return errBusy754 }755 defer h.pendingSem.Release()756 reply.Disks = h.server.getStatus()757 log.Infof("GetDiskInfo: req %+v reply %+v", req, *reply)758 return nil759}760// SetControlFlags changes control flags for a disk.761func (h *TSSrvHandler) SetControlFlags(req core.SetControlFlagsReq, reply *core.Error) error {762 op := h.opm.Start("SetControlFlags")763 defer op.EndWithBlbError(reply)764 // Check failure service.765 if err := h.getFailure("SetControlFlags"); err != core.NoError {766 log.Errorf("SetControlFlags: failure service override, returning %s", err)767 *reply = err768 return nil769 }770 // Check pending request limit.771 if !h.pendingSem.TryAcquire() {772 op.TooBusy()773 log.Errorf("SetControlFlags: too busy, rejecting req")774 return errBusy775 }776 defer h.pendingSem.Release()777 if req.Flags.DrainLocal > 0 {778 log.Errorf("SetControlFlags: DrainLocal not implemented yet")779 *reply = core.ErrInvalidArgument780 } else {781 *reply = h.store.SetControlFlags(req.Root, req.Flags)782 }783 log.Infof("SetControlFlags: req %+v reply %+v", req, *reply)784 return nil785}786// Cancel attempts to signal the pending operation identified by 'id' that it should be canceled.787func (h *TSSrvHandler) Cancel(id string, reply *core.Error) error {788 if h.inFlight.cancel(id) {789 *reply = core.NoError790 } else {791 *reply = core.ErrCancelFailed792 }793 return nil794}795func (h *TSSrvHandler) rpcStats() map[string]string {796 return h.opm.Strings(797 "CreateTract",798 "Read",799 "Write",800 "StatTract",801 "GetDiskInfo",802 "SetControlFlags",803 )804}805// Return the error registered with the given operation 'op', if any.806func (h *TSSrvHandler) getFailure(op string) core.Error {807 if h.opFailure == nil {808 return core.NoError809 }810 return h.opFailure.Get(op)811}812// opTracker keeps track of the contexts of active requests.813type opTracker struct {814 // Active requests.815 //816 // Maps from the client-supplied request ID to the function called to cancel the associated request.817 // Our current RPC mechanism doesn't allow access to the calling host:port, so we trust818 // the client to prepend their cancellation key with that. This is a bit risky as clients819 // can cancel each other.820 active map[string]context.CancelFunc821 lock sync.Mutex822}823func newOpTracker() *opTracker {824 return &opTracker{active: make(map[string]context.CancelFunc)}825}826// start notes that we're starting a cancellable operation with id 'id'.827// returns 'nil' if the op is already running, in which case the caller should return an error 'up'.828func (s *opTracker) start(id string) context.Context {829 if len(id) == 0 {830 return context.Background()831 }832 s.lock.Lock()833 defer s.lock.Unlock()834 if _, ok := s.active[id]; ok {835 log.Errorf("uh oh, duplicate op ID: %s", id)836 return nil837 }838 ctx, cancel := context.WithCancel(context.Background())839 s.active[id] = cancel840 return ctx841}842// cancel cancels the op with the provided ID.843//844// Returns true if the op exists and was told to cancel.845// Returns false if the op doesn't exist.846func (s *opTracker) cancel(id string) bool {847 if len(id) == 0 {848 return false849 }850 s.lock.Lock()851 defer s.lock.Unlock()852 cancelFunc, ok := s.active[id]853 if ok {854 cancelFunc()855 }856 return ok857}858// end notes that we're ending a cancellable operation with id 'id'.859func (s *opTracker) end(id string) {860 if len(id) == 0 {861 return862 }863 s.lock.Lock()864 defer s.lock.Unlock()865 if _, ok := s.active[id]; !ok {866 log.Errorf("programming error! ending an op w/o a cancel func, id is %s", id)867 }868 delete(s.active, id)869}...

Full Screen

Full Screen

config_test.go

Source:config_test.go Github

copy

Full Screen

...42 if act := config.GetParamNames(ctx); len(act) != 0 {43 t.Fatalf(`GetParamNames() = %v, wanted empty array`, act)44 }45}46func TestGetFailure(t *testing.T) {47 ctx := CLIContext{}48 if !config.GetFailure(ctx) {49 t.Fatal(`GetFailure() = false, wanted true`)50 }51}52func TestGetQuiet(t *testing.T) {53 ctx := CLIContext{}54 if !config.GetQuiet(ctx) {55 t.Fatal(`GetQuiet() = false, wanted true`)56 }57}...

Full Screen

Full Screen

config.go

Source:config.go Github

copy

Full Screen

...24// GetParamNames returns the param names.25func GetParamNames(c domain.CLIContext) []string {26 return []string(c.Args())27}28// GetFailure returns failure option.29func GetFailure(c domain.CLIContext) bool {30 return c.Bool(failureKey)31}32// GetQuiet returns quiet option.33func GetQuiet(c domain.CLIContext) bool {34 return c.Bool(quietKey)35}...

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println("Hello, playground")4 internal.GetFailure()5}6import (7func GetFailure() {8 fmt.Println("Failure")9}10 /usr/local/go/src/internal (from $GOROOT)11 /tmp/sandbox006718864/src/internal (from $GOPATH)12How can I import internal packages in Go?

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println(internal.GetFailure())4}5import (6func main() {7 fmt.Println(internal.GetFailure())8}9import (10func main() {11 fmt.Println(internal.GetFailure())12}13import (14func main() {15 fmt.Println(internal.GetFailure())16}17import (18func main() {19 fmt.Println(internal.GetFailure())20}21import (22func main() {23 fmt.Println(internal.GetFailure())24}25import (26func main() {27 fmt.Println(internal.GetFailure())28}29import (

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1func main() {2 var a = internal.GetFailure()3 fmt.Println(a)4}5func GetFailure() string {6}7import (8func TestGetFailure(t *testing.T) {9 var a = GetFailure()10 fmt.Println(a)11}12func GetFailure() string {13}14import (15func TestGetFailure(t *testing.T) {16 var a = GetFailure()17 fmt.Println(a)18}19func GetFailure() string {20}21import (22func TestGetFailure(t *testing.T) {23 var a = GetFailure()24 fmt.Println(a)25}26func GetFailure() string {27}28import (29func TestGetFailure(t *testing.T) {30 var a = GetFailure()31 fmt.Println(a)32}33func GetFailure() string {34}35import (36func TestGetFailure(t *testing.T) {37 var a = GetFailure()38 fmt.Println(a)39}40func GetFailure() string {41}42import (43func TestGetFailure(t *testing.T) {44 var a = GetFailure()45 fmt.Println(a)46}47func GetFailure() string {48}

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println(internal.GetFailure())4}5import (6func GetFailure() string {7 return fmt.Sprintf("Failure")8}9import (10func GetSuccess() string {11 return fmt.Sprintf("Success")12}13import (14func GetResult() string {15 return fmt.Sprintf("Result")16}17import (18func GetResult() string {19 return fmt.Sprintf("Result")20}21import (22func GetResult() string {23 return fmt.Sprintf("Result")24}25import (26func GetResult() string {27 return fmt.Sprintf("Result")28}29import (30func GetResult() string {31 return fmt.Sprintf("Result")32}33import (34func GetResult() string {35 return fmt.Sprintf("Result")36}37import (38func GetResult() string {39 return fmt.Sprintf("Result")40}41import (42func GetResult() string {43 return fmt.Sprintf("Result")44}45import (46func GetResult() string

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println(vis.MyName)4 vis.PrintVar()5}6import (7func main() {8 fmt.Println(vis.GetFailure())9}

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println(internal.GetFailure())4}5func GetFailure() bool {6}7import "internal"8func main() {9 fmt.Println(internal.GetFailure())10}11func GetFailure() bool {12}13import "internal"14func main() {15 fmt.Println(internal.GetFailure())16}17func GetFailure() bool {18}19import "internal"20func main() {21 fmt.Println(internal.GetFailure())22}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Ginkgo automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used method in

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful