How to use GetFailure method of internal Package

Best Ginkgo code snippet using internal.GetFailure

server.go

Source:server.go Github

copy

Full Screen

1// Copyright (c) 2015 Western Digital Corporation or its affiliates.  All rights reserved.2// SPDX-License-Identifier: MIT3package tractserver4import (5	"context"6	"errors"7	"net/http"8	"sync"9	"time"10	log "github.com/golang/glog"11	"github.com/westerndigitalcorporation/blb/internal/core"12	"github.com/westerndigitalcorporation/blb/internal/server"13	"github.com/westerndigitalcorporation/blb/pkg/failures"14	"github.com/westerndigitalcorporation/blb/pkg/rpc"15)16type curatorInfo struct {17	// These may be stale; a curator in one repl group could be scheduled on a machine,18	// die, get scheduled elsewhere, and a curator in a diff. repl group could be brought19	// up on that same machine.20	partitions []core.PartitionID21}22// Server is the RPC server for the TractStore23type Server struct {24	// The actual data storage.25	store *Store26	// Configuration parameters.27	cfg *Config28	// Aggregate connections to curators.29	ct CuratorTalker30	// The set of curators (indexed by address) that we heartbeat to.31	curators map[string]curatorInfo32	// Cached disk status.33	fsStatusCache []core.FsStatus34	// When was the cached disk status last updated.35	fsStatusCacheUpdated time.Time36	// Lock for 'curators', 'fsStatus' and 'fsStatusUpdated'.37	lock sync.Mutex38	// The connection to master.39	mc *RPCMasterConnection40	// Control handler.41	ctlHandler *TSCtlHandler42	// Service handler.43	srvHandler *TSSrvHandler44}45// NewServer creates a new Server. The server does not listen for or serve46// requests until Start() is called on it.47func NewServer(store *Store, ct CuratorTalker, mc *RPCMasterConnection, cfg *Config) *Server {48	return &Server{49		store:    store,50		ct:       ct,51		cfg:      cfg,52		curators: make(map[string]curatorInfo),53		mc:       mc,54	}55}56// Start starts the TractServer by launching goroutines to accept RPC requests.57func (s *Server) Start() (err error) {58	s.initTractserverID()59	// Set up status page.60	http.HandleFunc("/", s.statusHandler)61	//http.HandleFunc("/logs", health.HandleLogs)62	//http.HandleFunc("/loglevel", health.HandleLogsLevel)63	// Endpoint for shutting down the tractserver.64	http.HandleFunc("/_quit", server.QuitHandler)65	// Create control/service handlers.66	opm := server.NewOpMetric("tractserver_rpc", "rpc")67	s.ctlHandler = newTSCtlHandler(s, opm)68	s.srvHandler = newTSSrvHandler(s, opm)69	// Register the rpc handlers.70	if err = rpc.RegisterName("TSCtlHandler", s.ctlHandler); nil != err {71		return err72	}73	if err = rpc.RegisterName("TSSrvHandler", s.srvHandler); err != nil {74		return err75	}76	go s.masterHeartbeatLoop()77	go s.curatorHeartbeatLoop()78	log.Infof("tractserver id=%v listening on address %s", s.store.GetID(), s.cfg.Addr)79	err = http.ListenAndServe(s.cfg.Addr, nil) // this blocks forever80	log.Fatalf("http listener returned error: %v", err)81	return82}83// initTractserverID contacts the master to register this tractserver for service.84func (s *Server) initTractserverID() core.TractserverID {85	// We just started up and probably don't have any disks yet. Let's wait86	// until we add some disks.87	for seconds := 0; s.store.DiskCount() == 0; seconds++ {88		if seconds%60 == 3 {89			log.Infof("Waiting for disks to read TSID...")90		}91		time.Sleep(time.Second)92	}93	// Already registered.94	if id := s.store.GetID(); id.IsValid() {95		return id96	}97	// Check for manual override.98	if overrideID := core.TractserverID(s.cfg.OverrideID); overrideID.IsValid() {99		log.Infof("tractserver ID is being **OVERRIDDEN** to %d", overrideID)100		if _, err := s.store.SetID(overrideID); err != core.NoError {101			log.Fatalf("error storing OVERRIDDEN tractserver ID: %s", err)102		}103		return overrideID104	}105	// We haven't registered yet. Register with the master.106	for {107		reply, err := s.mc.RegisterTractserver(context.Background(), s.cfg.Addr)108		if err != core.NoError {109			log.Errorf("initTractserverID: failed to register tractserver with master, sleeping and retrying, err=%s", err)110			time.Sleep(s.cfg.RegistrationRetry)111			continue112		}113		log.Infof("registered TSID %s", reply.TSID)114		if id, err := s.store.SetID(reply.TSID); err != core.NoError {115			log.Fatalf("failed to persist the tractserver ID: %s", err)116		} else {117			return id118		}119	}120}121// masterHeartbeatLoop runs forever, sending heartbeats to the master.122func (s *Server) masterHeartbeatLoop() {123	masterTicker := time.NewTicker(s.cfg.MasterHeartbeatInterval)124	for {125		// Send the regularly scheduled heartbeat.126		beat, err := s.mc.MasterTractserverHeartbeat(context.Background(), s.store.GetID(), s.cfg.Addr, s.getStatusForHeartbeat())127		if err != core.NoError {128			log.Errorf("masterHeartbeatLoop: error sending heartbeat to master, sleeping and retrying, err=%s", err)129			time.Sleep(s.cfg.MasterHeartbeatRetry)130			continue131		}132		s.processMasterHeartbeat(beat)133		<-masterTicker.C134	}135}136// processMasterHeartbeat compares our view of active curators with the master's view,137// establishing connections to new curators and closing connections to stale curators.138func (s *Server) processMasterHeartbeat(beat core.MasterTractserverHeartbeatReply) {139	// Collect some stats for logging.140	alreadyConnected := 0141	newConnections := 0142	removedConnections := 0143	// We create a map of what's in the from-master heartbeat so that we can remove any144	// curators that aren't in it later.145	inBeat := make(map[string]bool)146	load := s.getLoad()147	// Using s.curators so we lock it.148	s.lock.Lock()149	for _, addr := range beat.Curators {150		inBeat[addr] = true151		if _, ok := s.curators[addr]; ok {152			alreadyConnected++153		} else {154			s.curators[addr] = curatorInfo{}155			go s.beatToCurator(nil, addr, nil, load)156			newConnections++157		}158	}159	for addr := range s.curators {160		if !inBeat[addr] {161			go s.ct.Close(addr)162			delete(s.curators, addr)163			removedConnections++164		}165	}166	s.lock.Unlock()167	if newConnections+removedConnections > 0 {168		log.Infof("processMasterHeartbeat: %d already connected to, %d new connections, %d dropped, total %d curator conns",169			alreadyConnected, newConnections, removedConnections, alreadyConnected+newConnections)170	}171}172// getLoad returns information about how loaded this TS is.173func (s *Server) getLoad() core.TractserverLoad {174	var load core.TractserverLoad175	for _, s := range s.getStatus() {176		load.NumTracts += s.NumTracts177		load.TotalSpace += s.TotalSpace178		// A disk's FS might have free space but we can't use it if the disk is unhealthy.179		if s.Status.Healthy && !s.Status.Full {180			load.AvailSpace += s.AvailSpace181		}182	}183	return load184}185// curatorHeartbeatLoop runs forever, regularly sending heartbeats to all curators in s.curators.186func (s *Server) curatorHeartbeatLoop() {187	// Maps from a partition ID to tracts that we store for that partition.188	// Used to send a subset of tracts to the right curator.189	var tractsByPart map[core.PartitionID][]core.TractID190	var shard uint64191	for range time.Tick(s.cfg.CuratorHeartbeatInterval) {192		// pickSubset removes a subset of tractsByPart.  When it's out of tracts we refresh it.193		if len(tractsByPart) == 0 {194			tractsByPart = s.store.GetSomeTractsByPartition(shard)195			shard++196		}197		// Send the same load vector to all curators.198		load := s.getLoad()199		// Protect s.curators as we iterate over it.200		s.lock.Lock()201		log.V(2).Infof("sending heartbeats to %d curators", len(s.curators))202		for c, ci := range s.curators {203			subset := s.pickSubset(ci.partitions, tractsByPart)204			go s.beatToCurator(ci.partitions, c, subset, load)205		}206		s.lock.Unlock()207	}208}209// Pick a subset of the tracts in 'all', up to tractsPerCuratorHeartbeat, that have a partition in 'parts'.210func (s *Server) pickSubset(parts []core.PartitionID, all map[core.PartitionID][]core.TractID) []core.TractID {211	var ret []core.TractID212	for _, p := range parts {213		budget := s.cfg.TractsPerCuratorHeartbeat - len(ret)214		if budget <= 0 {215			break216		} else if budget < len(all[p]) {217			ret = append(ret, all[p][0:budget]...)218			all[p] = all[p][budget:]219		} else {220			ret = append(ret, all[p]...)221			delete(all, p)222		}223	}224	return ret225}226// beatToCurator sents a heartbeat to the given curator.227func (s *Server) beatToCurator(parts []core.PartitionID, curatorAddr string, tracts []core.TractID, load core.TractserverLoad) {228	// Pull out what's corrupt and managed by this curator.229	beat := core.CuratorTractserverHeartbeatReq{230		TSID:    s.store.GetID(),231		Addr:    s.cfg.Addr,232		Corrupt: s.store.GetBadTracts(parts, s.cfg.BadTractsPerHeartbeat),233		Has:     tracts,234		Load:    load,235	}236	parts, err := s.ct.CuratorTractserverHeartbeat(curatorAddr, beat)237	if err != nil {238		log.Errorf("failed to beat to curator at %s, err=%s", curatorAddr, err)239		return240	}241	log.V(2).Infof("beat to curator at %s, sent %d corrupt, %d owned, got partitions %v",242		curatorAddr, len(beat.Corrupt), len(beat.Has), parts)243	// Update which partitions the curator owns and remove reported bad244	// tracts from failures map.245	s.lock.Lock()246	s.curators[curatorAddr] = curatorInfo{partitions: parts}247	s.store.removeTractsFromFailures(beat.Corrupt)248	s.lock.Unlock()249}250// getStatus retrieves disk status -- used by getLoad and status page. The251// caller should not mutate the returned slice.252func (s *Server) getStatus() []core.FsStatus {253	s.lock.Lock()254	defer s.lock.Unlock()255	// If the cached status is too stale, go down to the store layer to256	// retrieve the actual status and refresh the cache.257	if time.Now().After(s.fsStatusCacheUpdated.Add(s.cfg.DiskStatusCacheTTL)) {258		s.fsStatusCache = s.store.getStatus()259		s.fsStatusCacheUpdated = time.Now()260	}261	return s.fsStatusCache262}263// getStatusForHeartbeat is like getStatus, but clears some fields we don't need264// to send to the master.265func (s *Server) getStatusForHeartbeat() []core.FsStatus {266	disks := s.getStatus()267	out := make([]core.FsStatus, len(disks))268	for i, d := range disks {269		out[i] = d270		out[i].Ops = nil271	}272	return out273}274//-----------------275// Control handler276//-----------------277// TSCtlHandler handles all control messages.278type TSCtlHandler struct {279	// When failure service is enabled, what errors failed operations should return.280	opFailure *server.OpFailure281	// The server.282	server *Server283	// The actual data store.284	store *Store285	// The semaphore which is used to limit the number of pending requests.286	pendingSem server.Semaphore287	// Metrics we collect.288	opm *server.OpMetric289}290// newTSCtlHandler creates a new TSCtlHandler.291func newTSCtlHandler(s *Server, opm *server.OpMetric) *TSCtlHandler {292	cfg := s.cfg293	handler := &TSCtlHandler{294		server:     s,295		store:      s.store,296		pendingSem: server.NewSemaphore(cfg.RejectCtlReqThreshold),297		opm:        opm,298	}299	if cfg.UseFailure {300		handler.opFailure = server.NewOpFailure()301		if err := failures.Register("ts_control_failure", handler.opFailure.Handler); err != nil {302			log.Errorf("failed to register failure service: %s", err)303		}304	}305	return handler306}307// SetVersion sets the version of a tract.  This is done by the curator when it changes replication308// group membership.309func (h *TSCtlHandler) SetVersion(req core.SetVersionReq, reply *core.SetVersionReply) error {310	op := h.opm.Start("SetVersion")311	defer op.EndWithBlbError(&reply.Err)312	// Check failure service.313	if err := h.getFailure("SetVersion"); err != core.NoError {314		log.Errorf("SetVersion: failure service override, returning %s", err)315		*reply = core.SetVersionReply{Err: err}316		return nil317	}318	// Check pending request limit.319	if !h.pendingSem.TryAcquire() {320		op.TooBusy()321		log.Errorf("SetVersion: too busy, rejecting req")322		return errBusy323	}324	defer h.pendingSem.Release()325	if !h.store.HasID(req.TSID) {326		log.Infof("SetVersion: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())327		*reply = core.SetVersionReply{Err: core.ErrWrongTractserver}328		return nil329	}330	reply.NewVersion, reply.Err = h.store.SetVersion(req.ID, req.NewVersion, req.ConditionalStamp)331	log.Infof("SetVersion: req %+v reply %+v", req, *reply)332	return nil333}334// PullTract copies a tract from an existing tractserver.335func (h *TSCtlHandler) PullTract(req core.PullTractReq, reply *core.Error) error {336	op := h.opm.Start("PullTract")337	defer op.EndWithBlbError(reply)338	// Check failure service.339	if err := h.getFailure("PullTract"); err != core.NoError {340		log.Errorf("PullTract: failure service override, returning %s", err)341		*reply = err342		return nil343	}344	// Check pending request limit.345	if !h.pendingSem.TryAcquire() {346		op.TooBusy()347		log.Errorf("PullTract: too busy, rejecting req")348		return errBusy349	}350	defer h.pendingSem.Release()351	if !h.store.HasID(req.TSID) {352		log.Infof("PullTract: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())353		*reply = core.ErrWrongTractserver354		return nil355	}356	ctx := controlContext()357	*reply = h.store.PullTract(ctx, req.From, req.ID, req.Version)358	log.Infof("PullTract: req %+v reply %+v", req, *reply)359	return nil360}361// CheckTracts is a request from a curator to verify that we have the tracts the curator thinks we do.362func (h *TSCtlHandler) CheckTracts(req core.CheckTractsReq, reply *core.CheckTractsReply) error {363	op := h.opm.Start("CheckTracts")364	defer op.EndWithBlbError(&reply.Err)365	// Check failure service.366	if err := h.getFailure("CheckTracts"); err != core.NoError {367		log.Errorf("CheckTracts: failure service override, returning %s", err)368		*reply = core.CheckTractsReply{Err: err}369		return nil370	}371	// Check pending request limit.372	if !h.pendingSem.TryAcquire() {373		op.TooBusy()374		log.Errorf("CheckTracts: too busy, rejecting req")375		return errBusy376	}377	defer h.pendingSem.Release()378	if !h.store.HasID(req.TSID) {379		log.Infof("CheckTracts: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())380		*reply = core.CheckTractsReply{Err: core.ErrWrongTractserver}381		return nil382	}383	*reply = core.CheckTractsReply{Err: core.NoError}384	log.Infof("CheckTracts: req %d tracts reply %+v", len(req.Tracts), *reply)385	// Enqueue the check tracts request into the channel and the check will be386	// done by another goroutine.387	select {388	case h.store.checkTractsCh <- req.Tracts:389	case <-time.After(3 * time.Second):390		// Drop the check request after a certain timeout. This is fine given391		// curators will redo the check periodically.392		log.Errorf("Timeout on blocking on the check tracts channel, drop the CheckTracts request with %d tracts", len(req.Tracts))393		op.TooBusy()394	}395	return nil396}397// GCTract is a request to garbage collect the provided tracts as we don't need to store them anymore.398func (h *TSCtlHandler) GCTract(req core.GCTractReq, reply *core.Error) error {399	op := h.opm.Start("GCTract")400	defer op.EndWithBlbError(reply)401	if err := h.getFailure("GCTract"); err != core.NoError {402		log.Errorf("GCTract: failure service override, returning %s", err)403		*reply = err404		return nil405	}406	// Check pending request limit.407	if !h.pendingSem.TryAcquire() {408		op.TooBusy()409		log.Errorf("GCTract: too busy, rejecting req")410		return errBusy411	}412	defer h.pendingSem.Release()413	if !h.store.HasID(req.TSID) {414		log.Infof("GCTract: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())415		*reply = core.ErrWrongTractserver416		return nil417	}418	h.store.GCTracts(req.Old, req.Gone)419	*reply = core.NoError420	log.Infof("GCTract: req %+v reply %+v", req, *reply)421	return nil422}423// GetTSID returns the id of this tractserver. This is only used for testing424// purposes and not in production.425func (h *TSCtlHandler) GetTSID(req struct{}, reply *core.TractserverID) error {426	if id := h.store.GetID(); id.IsValid() {427		*reply = id428		return nil429	}430	return errors.New("unregistered tractserver")431}432// PackTracts instructs the tractserver to read a bunch of tracts from other433// tractservers and write them to one RS data chunk on the local tractserver.434func (h *TSCtlHandler) PackTracts(req core.PackTractsReq, reply *core.Error) error {435	op := h.opm.Start("PackTracts")436	defer op.EndWithBlbError(reply)437	if err := h.getFailure("PackTracts"); err != core.NoError {438		log.Errorf("PackTracts: failure service override, returning %s", err)439		*reply = err440		return nil441	}442	// Check pending request limit.443	if !h.pendingSem.TryAcquire() {444		op.TooBusy()445		log.Errorf("PackTracts: too busy, rejecting req")446		return errBusy447	}448	defer h.pendingSem.Release()449	if !h.store.HasID(req.TSID) {450		log.Infof("PackTracts: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())451		*reply = core.ErrWrongTractserver452		return nil453	}454	ctx := controlContext()455	*reply = h.store.PackTracts(ctx, req.Length, req.Tracts, req.ChunkID)456	log.Infof("PackTracts: req %+v reply %+v", req, *reply)457	return nil458}459// RSEncode instructs the tractserver to read a bunch of RS data chunks, perform460// the RS parity computation, and write out parity chunks to other tractservers.461func (h *TSCtlHandler) RSEncode(req core.RSEncodeReq, reply *core.Error) error {462	op := h.opm.Start("RSEncode")463	defer op.EndWithBlbError(reply)464	if err := h.getFailure("RSEncode"); err != core.NoError {465		log.Errorf("RSEncode: failure service override, returning %s", err)466		*reply = err467		return nil468	}469	// Check pending request limit.470	if !h.pendingSem.TryAcquire() {471		op.TooBusy()472		log.Errorf("RSEncode: too busy, rejecting req")473		return errBusy474	}475	defer h.pendingSem.Release()476	if !h.store.HasID(req.TSID) {477		log.Infof("RSEncode: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())478		*reply = core.ErrWrongTractserver479		return nil480	}481	ctx := controlContext()482	*reply = h.store.RSEncode(ctx, req.ChunkID, req.Length, req.Srcs, req.Dests, req.IndexMap)483	log.Infof("RSEncode: req %+v reply %+v", req, *reply)484	return nil485}486// CtlRead reads from a tract, bypassing request limits. CtlReads are not487// cancellable.488func (h *TSCtlHandler) CtlRead(req core.ReadReq, reply *core.ReadReply) error {489	op := h.opm.Start("CtlRead")490	defer op.EndWithBlbError(&reply.Err)491	if err := h.getFailure("CtlRead"); err != core.NoError {492		log.Errorf("CtlRead: failure service override, returning %s", err)493		*reply = core.ReadReply{Err: err}494		return nil495	}496	// Check pending request limit.497	if !h.pendingSem.TryAcquire() {498		op.TooBusy()499		log.Errorf("CtlRead: too busy, rejecting req")500		return errBusy501	}502	defer h.pendingSem.Release()503	ctx := controlContext()504	var b []byte505	b, reply.Err = h.store.Read(ctx, req.ID, req.Version, req.Len, req.Off)506	reply.Set(b, true)507	log.Infof("CtlRead: req %+v reply len %d Err %s", req, len(b), reply.Err)508	return nil509}510// CtlWrite does a write to a tract, bypassing request limits.511// Unlike regular Write, CtlWrite creates the tract when Off is zero.512func (h *TSCtlHandler) CtlWrite(req core.WriteReq, reply *core.Error) error {513	op := h.opm.Start("CtlWrite")514	defer op.EndWithBlbError(reply)515	if err := h.getFailure("CtlWrite"); err != core.NoError {516		log.Errorf("CtlWrite: failure service override, returning %s", err)517		*reply = err518		return nil519	}520	// Check pending request limit.521	if !h.pendingSem.TryAcquire() {522		op.TooBusy()523		log.Errorf("CtlWrite: too busy, rejecting req")524		return errBusy525	}526	defer h.pendingSem.Release()527	// Currently, we only support CtlWrite for RS chunks.528	if req.ID.Blob.Partition().Type() != core.RSPartition {529		*reply = core.ErrInvalidArgument530		return nil531	}532	// TODO: This feels a little messy. Is there a better way to do this?533	// (Without adding a whole new CtlCreate path?)534	ctx := controlContext()535	if req.Off == 0 {536		*reply = h.store.Create(ctx, req.ID, req.B, req.Off)537	} else {538		*reply = h.store.Write(ctx, req.ID, req.Version, req.B, req.Off)539	}540	lenB := len(req.B)541	rpc.PutBuffer(req.Get())542	log.Infof("CtlWrite: req ID %v Version %v len(B) %d, Off %d, reply %+v", req.ID, req.Version, lenB, req.Off, *reply)543	return nil544}545// CtlStatTract returns the size of a tract.546func (h *TSCtlHandler) CtlStatTract(req core.StatTractReq, reply *core.StatTractReply) error {547	op := h.opm.Start("CtlStatTract")548	defer op.EndWithBlbError(&reply.Err)549	// Check failure service.550	if err := h.getFailure("StatTract"); err != core.NoError {551		log.Errorf("StatTract: failure service override, returning %s", err)552		*reply = core.StatTractReply{Err: err}553		return nil554	}555	// Check pending request limit.556	if !h.pendingSem.TryAcquire() {557		op.TooBusy()558		log.Errorf("CtlStatTract: too busy, rejecting req")559		return errBusy560	}561	defer h.pendingSem.Release()562	ctx := controlContext()563	reply.Size, reply.ModStamp, reply.Err = h.store.Stat(ctx, req.ID, req.Version)564	log.Infof("CtlStatTract: req %+v reply %+v", req, *reply)565	return nil566}567func (h *TSCtlHandler) rpcStats() map[string]string {568	return h.opm.Strings(569		"CheckTracts",570		"GCTract",571		"PullTract",572		"SetVersion",573		"PackTracts",574		"RSEncode",575		"CtlRead",576		"CtlWrite",577		"CtlStatTract",578	)579}580// Return the error registered with the given operation 'op', if any.581func (h *TSCtlHandler) getFailure(op string) core.Error {582	if nil == h.opFailure {583		return core.NoError584	}585	return h.opFailure.Get(op)586}587//-----------------588// Service handler589//-----------------590// errBusy is returned if there are too many pending requests.591var errBusy = errors.New("the server is too busy to serve this request")592// TSSrvHandler handles all client requests.593type TSSrvHandler struct {594	// When failure service is enabled, what errors failed operations should return.595	opFailure *server.OpFailure596	// The server.597	server *Server598	// The actual data store.599	store *Store600	// The semaphore which is used to limit the number of pending requests.601	pendingSem server.Semaphore602	// Per-RPC info.603	opm *server.OpMetric604	inFlight *opTracker605}606// newTSSrvHandler creates a new TSSrvHandler.607func newTSSrvHandler(s *Server, opm *server.OpMetric) *TSSrvHandler {608	cfg := s.cfg609	handler := &TSSrvHandler{610		server:     s,611		store:      s.store,612		pendingSem: server.NewSemaphore(cfg.RejectReqThreshold),613		opm:        opm,614		inFlight:   newOpTracker(),615	}616	if cfg.UseFailure {617		handler.opFailure = server.NewOpFailure()618		if err := failures.Register("ts_service_failure", handler.opFailure.Handler); err != nil {619			log.Errorf("failed to register failure service: %s", err)620		}621	}622	return handler623}624// CreateTract creates a tract and writes data to it. Upon success, the tract625// can be further Read/Write'd, and will have version 1.626func (h *TSSrvHandler) CreateTract(req core.CreateTractReq, reply *core.Error) error {627	op := h.opm.Start("CreateTract")628	defer op.EndWithBlbError(reply)629	// Check failure service.630	if err := h.getFailure("CreateTract"); err != core.NoError {631		log.Errorf("Create: failure service override, returning %s", err)632		*reply = err633		return nil634	}635	// Check pending request limit.636	if !h.pendingSem.TryAcquire() {637		op.TooBusy()638		log.Errorf("CreateTract: too busy, rejecting req")639		return errBusy640	}641	defer h.pendingSem.Release()642	if !h.store.HasID(req.TSID) {643		log.Infof("CreateTract: request has tsid %d, i have %d, rejecting", req.TSID, h.store.GetID())644		*reply = core.ErrWrongTractserver645		return nil646	}647	ctx := contextWithPriority(context.Background(), mapPriority(req.Pri))648	*reply = h.store.Create(ctx, req.ID, req.B, req.Off)649	lenB := len(req.B)650	rpc.PutBuffer(req.Get())651	log.Infof("Create: req ID %v len(B) %d, Off %d, reply %+v", req.ID, lenB, req.Off, *reply)652	return nil653}654// Write does a write to a tract on this tractserver.655func (h *TSSrvHandler) Write(req core.WriteReq, reply *core.Error) error {656	op := h.opm.Start("Write")657	defer op.EndWithBlbError(reply)658	// Check failure service.659	if err := h.getFailure("Write"); err != core.NoError {660		log.Errorf("Write: failure service override, returning %s", err)661		*reply = err662		return nil663	}664	// Check pending request limit.665	if !h.pendingSem.TryAcquire() {666		op.TooBusy()667		log.Errorf("Write: too busy, rejecting req")668		return errBusy669	}670	defer h.pendingSem.Release()671	// Make sure the client can cancel this op by adding it to the in-flight table.672	ctx := h.inFlight.start(req.ReqID)673	if ctx == nil {674		log.Errorf("Write: new request w/existing ReqID, rejecting.  req: %+v", req)675		return errBusy676	}677	defer h.inFlight.end(req.ReqID)678	ctx = contextWithPriority(ctx, mapPriority(req.Pri))679	*reply = h.store.Write(ctx, req.ID, req.Version, req.B, req.Off)680	lenB := len(req.B)681	rpc.PutBuffer(req.Get())682	log.Infof("Write: req ID %v Version %v len(B) %d, Off %d, reply %+v", req.ID, req.Version, lenB, req.Off, *reply)683	return nil684}685// Read reads from a tract.686func (h *TSSrvHandler) Read(req core.ReadReq, reply *core.ReadReply) error {687	op := h.opm.Start("Read")688	defer op.EndWithBlbError(&reply.Err)689	// Check failure service.690	if err := h.getFailure("Read"); err != core.NoError {691		log.Errorf("Read: failure service override, returning %s", err)692		*reply = core.ReadReply{Err: err}693		return nil694	}695	// Check pending request limit.696	if !h.pendingSem.TryAcquire() {697		op.TooBusy()698		log.Errorf("Read: too busy, rejecting req")699		return errBusy700	}701	defer h.pendingSem.Release()702	// Make sure the client can cancel this op by adding it to the in-flight table.703	ctx := h.inFlight.start(req.ReqID)704	if ctx == nil {705		log.Errorf("Read: new request w/existing ReqID, rejecting.  req: %+v", req)706		return errBusy707	}708	defer h.inFlight.end(req.ReqID)709	// Actually do the op.710	ctx = contextWithPriority(ctx, mapPriority(req.Pri))711	var b []byte712	b, reply.Err = h.store.Read(ctx, req.ID, req.Version, req.Len, req.Off)713	reply.Set(b, true)714	log.Infof("Read: req %+v reply len %d Err %s", req, len(b), reply.Err)715	return nil716}717// StatTract returns the size of a tract.718func (h *TSSrvHandler) StatTract(req core.StatTractReq, reply *core.StatTractReply) error {719	op := h.opm.Start("StatTract")720	defer op.EndWithBlbError(&reply.Err)721	// Check failure service.722	if err := h.getFailure("StatTract"); err != core.NoError {723		log.Errorf("StatTract: failure service override, returning %s", err)724		*reply = core.StatTractReply{Err: err}725		return nil726	}727	// Check pending request limit.728	if !h.pendingSem.TryAcquire() {729		op.TooBusy()730		log.Errorf("StatTract: too busy, rejecting req")731		return errBusy732	}733	defer h.pendingSem.Release()734	ctx := contextWithPriority(context.Background(), mapPriority(req.Pri))735	reply.Size, reply.ModStamp, reply.Err = h.store.Stat(ctx, req.ID, req.Version)736	log.Infof("StatTract: req %+v reply %+v", req, *reply)737	return nil738}739// GetDiskInfo returns a summary of disk info in reply.740func (h *TSSrvHandler) GetDiskInfo(req core.GetDiskInfoReq, reply *core.GetDiskInfoReply) error {741	op := h.opm.Start("GetDiskInfo")742	defer op.EndWithBlbError(&reply.Err)743	// Check failure service.744	if err := h.getFailure("GetDiskInfo"); err != core.NoError {745		log.Errorf("GetDiskInfo: failure service override, returning %s", err)746		*reply = core.GetDiskInfoReply{Err: err}747		return nil748	}749	// Check pending request limit.750	if !h.pendingSem.TryAcquire() {751		op.TooBusy()752		log.Errorf("GetDiskInfo: too busy, rejecting req")753		return errBusy754	}755	defer h.pendingSem.Release()756	reply.Disks = h.server.getStatus()757	log.Infof("GetDiskInfo: req %+v reply %+v", req, *reply)758	return nil759}760// SetControlFlags changes control flags for a disk.761func (h *TSSrvHandler) SetControlFlags(req core.SetControlFlagsReq, reply *core.Error) error {762	op := h.opm.Start("SetControlFlags")763	defer op.EndWithBlbError(reply)764	// Check failure service.765	if err := h.getFailure("SetControlFlags"); err != core.NoError {766		log.Errorf("SetControlFlags: failure service override, returning %s", err)767		*reply = err768		return nil769	}770	// Check pending request limit.771	if !h.pendingSem.TryAcquire() {772		op.TooBusy()773		log.Errorf("SetControlFlags: too busy, rejecting req")774		return errBusy775	}776	defer h.pendingSem.Release()777	if req.Flags.DrainLocal > 0 {778		log.Errorf("SetControlFlags: DrainLocal not implemented yet")779		*reply = core.ErrInvalidArgument780	} else {781		*reply = h.store.SetControlFlags(req.Root, req.Flags)782	}783	log.Infof("SetControlFlags: req %+v reply %+v", req, *reply)784	return nil785}786// Cancel attempts to signal the pending operation identified by 'id' that it should be canceled.787func (h *TSSrvHandler) Cancel(id string, reply *core.Error) error {788	if h.inFlight.cancel(id) {789		*reply = core.NoError790	} else {791		*reply = core.ErrCancelFailed792	}793	return nil794}795func (h *TSSrvHandler) rpcStats() map[string]string {796	return h.opm.Strings(797		"CreateTract",798		"Read",799		"Write",800		"StatTract",801		"GetDiskInfo",802		"SetControlFlags",803	)804}805// Return the error registered with the given operation 'op', if any.806func (h *TSSrvHandler) getFailure(op string) core.Error {807	if h.opFailure == nil {808		return core.NoError809	}810	return h.opFailure.Get(op)811}812// opTracker keeps track of the contexts of active requests.813type opTracker struct {814	// Active requests.815	//816	// Maps from the client-supplied request ID to the function called to cancel the associated request.817	// Our current RPC mechanism doesn't allow access to the calling host:port, so we trust818	// the client to prepend their cancellation key with that.  This is a bit risky as clients819	// can cancel each other.820	active map[string]context.CancelFunc821	lock sync.Mutex822}823func newOpTracker() *opTracker {824	return &opTracker{active: make(map[string]context.CancelFunc)}825}826// start notes that we're starting a cancellable operation with id 'id'.827// returns 'nil' if the op is already running, in which case the caller should return an error 'up'.828func (s *opTracker) start(id string) context.Context {829	if len(id) == 0 {830		return context.Background()831	}832	s.lock.Lock()833	defer s.lock.Unlock()834	if _, ok := s.active[id]; ok {835		log.Errorf("uh oh, duplicate op ID: %s", id)836		return nil837	}838	ctx, cancel := context.WithCancel(context.Background())839	s.active[id] = cancel840	return ctx841}842// cancel cancels the op with the provided ID.843//844// Returns true if the op exists and was told to cancel.845// Returns false if the op doesn't exist.846func (s *opTracker) cancel(id string) bool {847	if len(id) == 0 {848		return false849	}850	s.lock.Lock()851	defer s.lock.Unlock()852	cancelFunc, ok := s.active[id]853	if ok {854		cancelFunc()855	}856	return ok857}858// end notes that we're ending a cancellable operation with id 'id'.859func (s *opTracker) end(id string) {860	if len(id) == 0 {861		return862	}863	s.lock.Lock()864	defer s.lock.Unlock()865	if _, ok := s.active[id]; !ok {866		log.Errorf("programming error!  ending an op w/o a cancel func, id is %s", id)867	}868	delete(s.active, id)869}...

Full Screen

Full Screen

config_test.go

Source:config_test.go Github

copy

Full Screen

...42	if act := config.GetParamNames(ctx); len(act) != 0 {43		t.Fatalf(`GetParamNames() = %v, wanted empty array`, act)44	}45}46func TestGetFailure(t *testing.T) {47	ctx := CLIContext{}48	if !config.GetFailure(ctx) {49		t.Fatal(`GetFailure() = false, wanted true`)50	}51}52func TestGetQuiet(t *testing.T) {53	ctx := CLIContext{}54	if !config.GetQuiet(ctx) {55		t.Fatal(`GetQuiet() = false, wanted true`)56	}57}...

Full Screen

Full Screen

config.go

Source:config.go Github

copy

Full Screen

...24// GetParamNames returns the param names.25func GetParamNames(c domain.CLIContext) []string {26	return []string(c.Args())27}28// GetFailure returns failure option.29func GetFailure(c domain.CLIContext) bool {30	return c.Bool(failureKey)31}32// GetQuiet returns quiet option.33func GetQuiet(c domain.CLIContext) bool {34	return c.Bool(quietKey)35}...

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3    fmt.Println("Hello, playground")4    internal.GetFailure()5}6import (7func GetFailure() {8    fmt.Println("Failure")9}10        /usr/local/go/src/internal (from $GOROOT)11        /tmp/sandbox006718864/src/internal (from $GOPATH)12How can I import internal packages in Go?

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3	fmt.Println(internal.GetFailure())4}5import (6func main() {7	fmt.Println(internal.GetFailure())8}9import (10func main() {11	fmt.Println(internal.GetFailure())12}13import (14func main() {15	fmt.Println(internal.GetFailure())16}17import (18func main() {19	fmt.Println(internal.GetFailure())20}21import (22func main() {23	fmt.Println(internal.GetFailure())24}25import (26func main() {27	fmt.Println(internal.GetFailure())28}29import (

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1func main() {2    var a = internal.GetFailure()3    fmt.Println(a)4}5func GetFailure() string {6}7import (8func TestGetFailure(t *testing.T) {9    var a = GetFailure()10    fmt.Println(a)11}12func GetFailure() string {13}14import (15func TestGetFailure(t *testing.T) {16    var a = GetFailure()17    fmt.Println(a)18}19func GetFailure() string {20}21import (22func TestGetFailure(t *testing.T) {23    var a = GetFailure()24    fmt.Println(a)25}26func GetFailure() string {27}28import (29func TestGetFailure(t *testing.T) {30    var a = GetFailure()31    fmt.Println(a)32}33func GetFailure() string {34}35import (36func TestGetFailure(t *testing.T) {37    var a = GetFailure()38    fmt.Println(a)39}40func GetFailure() string {41}42import (43func TestGetFailure(t *testing.T) {44    var a = GetFailure()45    fmt.Println(a)46}47func GetFailure() string {48}

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3    fmt.Println(internal.GetFailure())4}5import (6func GetFailure() string {7    return fmt.Sprintf("Failure")8}9import (10func GetSuccess() string {11    return fmt.Sprintf("Success")12}13import (14func GetResult() string {15    return fmt.Sprintf("Result")16}17import (18func GetResult() string {19    return fmt.Sprintf("Result")20}21import (22func GetResult() string {23    return fmt.Sprintf("Result")24}25import (26func GetResult() string {27    return fmt.Sprintf("Result")28}29import (30func GetResult() string {31    return fmt.Sprintf("Result")32}33import (34func GetResult() string {35    return fmt.Sprintf("Result")36}37import (38func GetResult() string {39    return fmt.Sprintf("Result")40}41import (42func GetResult() string {43    return fmt.Sprintf("Result")44}45import (46func GetResult() string

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3	fmt.Println(vis.MyName)4	vis.PrintVar()5}6import (7func main() {8	fmt.Println(vis.GetFailure())9}

Full Screen

Full Screen

GetFailure

Using AI Code Generation

copy

Full Screen

1import (2func main() {3	fmt.Println(internal.GetFailure())4}5func GetFailure() bool {6}7import "internal"8func main() {9	fmt.Println(internal.GetFailure())10}11func GetFailure() bool {12}13import "internal"14func main() {15	fmt.Println(internal.GetFailure())16}17func GetFailure() bool {18}19import "internal"20func main() {21	fmt.Println(internal.GetFailure())22}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Ginkgo automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used method in

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful