How to use KillProcess method of lang Package

Best Gauge code snippet using lang.KillProcess

api.go

Source:api.go Github

copy

Full Screen

1// Unless explicitly stated otherwise all files in this repository are licensed2// under the Apache License Version 2.0.3// This product includes software developed at Datadog (https://www.datadoghq.com/).4// Copyright 2016-present Datadog, Inc.5package api6import (7 "bytes"8 "context"9 "encoding/json"10 "expvar"11 "fmt"12 "io"13 "io/ioutil"14 stdlog "log"15 "math"16 "mime"17 "net"18 "net/http"19 "net/http/pprof"20 "os"21 "runtime"22 "sort"23 "strconv"24 "strings"25 "sync"26 "time"27 "github.com/tinylib/msgp/msgp"28 "github.com/DataDog/datadog-agent/pkg/trace/api/apiutil"29 "github.com/DataDog/datadog-agent/pkg/trace/appsec"30 "github.com/DataDog/datadog-agent/pkg/trace/config"31 "github.com/DataDog/datadog-agent/pkg/trace/config/features"32 "github.com/DataDog/datadog-agent/pkg/trace/info"33 "github.com/DataDog/datadog-agent/pkg/trace/log"34 "github.com/DataDog/datadog-agent/pkg/trace/metrics"35 "github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"36 "github.com/DataDog/datadog-agent/pkg/trace/pb"37 "github.com/DataDog/datadog-agent/pkg/trace/sampler"38 "github.com/DataDog/datadog-agent/pkg/trace/watchdog"39)40var bufferPool = sync.Pool{41 New: func() interface{} {42 return new(bytes.Buffer)43 },44}45func getBuffer() *bytes.Buffer {46 buffer := bufferPool.Get().(*bytes.Buffer)47 buffer.Reset()48 return buffer49}50func putBuffer(buffer *bytes.Buffer) {51 bufferPool.Put(buffer)52}53// HTTPReceiver is a collector that uses HTTP protocol and just holds54// a chan where the spans received are sent one by one55type HTTPReceiver struct {56 Stats *info.ReceiverStats57 RateLimiter *rateLimiter58 out chan *Payload59 conf *config.AgentConfig60 dynConf *sampler.DynamicConfig61 server *http.Server62 statsProcessor StatsProcessor63 appsecHandler http.Handler64 rateLimiterResponse int // HTTP status code when refusing65 wg sync.WaitGroup // waits for all requests to be processed66 exit chan struct{}67}68// NewHTTPReceiver returns a pointer to a new HTTPReceiver69func NewHTTPReceiver(conf *config.AgentConfig, dynConf *sampler.DynamicConfig, out chan *Payload, statsProcessor StatsProcessor) *HTTPReceiver {70 rateLimiterResponse := http.StatusOK71 if features.Has("429") {72 rateLimiterResponse = http.StatusTooManyRequests73 }74 appsecHandler, err := appsec.NewIntakeReverseProxy(conf)75 if err != nil {76 log.Errorf("Could not instantiate AppSec: %v", err)77 }78 return &HTTPReceiver{79 Stats: info.NewReceiverStats(),80 RateLimiter: newRateLimiter(),81 out: out,82 statsProcessor: statsProcessor,83 conf: conf,84 dynConf: dynConf,85 appsecHandler: appsecHandler,86 rateLimiterResponse: rateLimiterResponse,87 exit: make(chan struct{}),88 }89}90func (r *HTTPReceiver) buildMux() *http.ServeMux {91 mux := http.NewServeMux()92 hash, infoHandler := r.makeInfoHandler()93 r.attachDebugHandlers(mux)94 for _, e := range endpoints {95 if e.IsEnabled != nil && !e.IsEnabled(r.conf) {96 continue97 }98 mux.Handle(e.Pattern, replyWithVersion(hash, e.Handler(r)))99 }100 mux.HandleFunc("/info", infoHandler)101 return mux102}103// replyWithVersion returns an http.Handler which calls h with an addition of some104// HTTP headers containing version and state information.105func replyWithVersion(hash string, h http.Handler) http.Handler {106 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {107 w.Header().Set("Datadog-Agent-Version", info.Version)108 w.Header().Set("Datadog-Agent-State", hash)109 h.ServeHTTP(w, r)110 })111}112// Start starts doing the HTTP server and is ready to receive traces113func (r *HTTPReceiver) Start() {114 if r.conf.ReceiverPort == 0 {115 log.Debug("HTTP receiver disabled by config (apm_config.receiver_port: 0).")116 return117 }118 timeout := 5 * time.Second119 if r.conf.ReceiverTimeout > 0 {120 timeout = time.Duration(r.conf.ReceiverTimeout) * time.Second121 }122 httpLogger := log.NewThrottled(5, 10*time.Second) // limit to 5 messages every 10 seconds123 r.server = &http.Server{124 ReadTimeout: timeout,125 WriteTimeout: timeout,126 ErrorLog: stdlog.New(httpLogger, "http.Server: ", 0),127 Handler: r.buildMux(),128 }129 addr := fmt.Sprintf("%s:%d", r.conf.ReceiverHost, r.conf.ReceiverPort)130 ln, err := r.listenTCP(addr)131 if err != nil {132 killProcess("Error creating tcp listener: %v", err)133 }134 go func() {135 defer watchdog.LogOnPanic()136 if err := r.server.Serve(ln); err != nil && err != http.ErrServerClosed {137 log.Errorf("Could not start HTTP server: %v. HTTP receiver disabled.", err)138 }139 }()140 log.Infof("Listening for traces at http://%s", addr)141 if path := r.conf.ReceiverSocket; path != "" {142 ln, err := r.listenUnix(path)143 if err != nil {144 killProcess("Error creating UDS listener: %v", err)145 }146 go func() {147 defer watchdog.LogOnPanic()148 if err := r.server.Serve(ln); err != nil && err != http.ErrServerClosed {149 log.Errorf("Could not start UDS server: %v. UDS receiver disabled.", err)150 }151 }()152 log.Infof("Listening for traces at unix://%s", path)153 }154 if path := r.conf.WindowsPipeName; path != "" {155 pipepath := `\\.\pipe\` + path156 bufferSize := r.conf.PipeBufferSize157 secdec := r.conf.PipeSecurityDescriptor158 ln, err := listenPipe(pipepath, secdec, bufferSize)159 if err != nil {160 killProcess("Error creating %q named pipe: %v", pipepath, err)161 }162 go func() {163 defer watchdog.LogOnPanic()164 if err := r.server.Serve(ln); err != nil && err != http.ErrServerClosed {165 log.Errorf("Could not start Windows Pipes server: %v. Windows Pipes receiver disabled.", err)166 }167 }()168 log.Infof("Listening for traces on Windowes pipe %q. Security descriptor is %q", pipepath, secdec)169 }170 go r.RateLimiter.Run()171 go func() {172 defer watchdog.LogOnPanic()173 r.loop()174 }()175}176func (r *HTTPReceiver) attachDebugHandlers(mux *http.ServeMux) {177 mux.HandleFunc("/debug/pprof/", pprof.Index)178 mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)179 mux.HandleFunc("/debug/pprof/profile", pprof.Profile)180 mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)181 mux.HandleFunc("/debug/pprof/trace", pprof.Trace)182 mux.HandleFunc("/debug/blockrate", func(w http.ResponseWriter, r *http.Request) {183 // this endpoint calls runtime.SetBlockProfileRate(v), where v is an optional184 // query string parameter defaulting to 10000 (1 sample per 10μs blocked).185 rate := 10000186 v := r.URL.Query().Get("v")187 if v != "" {188 n, err := strconv.Atoi(v)189 if err != nil {190 http.Error(w, "v must be an integer", http.StatusBadRequest)191 return192 }193 rate = n194 }195 runtime.SetBlockProfileRate(rate)196 fmt.Fprintf(w, "Block profile rate set to %d. It will automatically be disabled again after calling /debug/pprof/block\n", rate)197 })198 mux.HandleFunc("/debug/pprof/block", func(w http.ResponseWriter, r *http.Request) {199 // serve the block profile and reset the rate to 0.200 pprof.Handler("block").ServeHTTP(w, r)201 runtime.SetBlockProfileRate(0)202 })203 mux.Handle("/debug/vars", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {204 // allow the GUI to call this endpoint so that the status can be reported205 w.Header().Set("Access-Control-Allow-Origin", "http://127.0.0.1:"+r.conf.GUIPort)206 expvar.Handler().ServeHTTP(w, req)207 }))208}209// listenUnix returns a net.Listener listening on the given "unix" socket path.210func (r *HTTPReceiver) listenUnix(path string) (net.Listener, error) {211 fi, err := os.Stat(path)212 if err == nil {213 // already exists214 if fi.Mode()&os.ModeSocket == 0 {215 return nil, fmt.Errorf("cannot reuse %q; not a unix socket", path)216 }217 if err := os.Remove(path); err != nil {218 return nil, fmt.Errorf("unable to remove stale socket: %v", err)219 }220 }221 ln, err := net.Listen("unix", path)222 if err != nil {223 return nil, err224 }225 if err := os.Chmod(path, 0o722); err != nil {226 return nil, fmt.Errorf("error setting socket permissions: %v", err)227 }228 return NewMeasuredListener(ln, "uds_connections"), err229}230// listenTCP creates a new net.Listener on the provided TCP address.231func (r *HTTPReceiver) listenTCP(addr string) (net.Listener, error) {232 tcpln, err := net.Listen("tcp", addr)233 if err != nil {234 return nil, err235 }236 if climit := r.conf.ConnectionLimit; climit > 0 {237 ln, err := newRateLimitedListener(tcpln, climit)238 go func() {239 defer watchdog.LogOnPanic()240 ln.Refresh(climit)241 }()242 return ln, err243 }244 return NewMeasuredListener(tcpln, "tcp_connections"), err245}246// Stop stops the receiver and shuts down the HTTP server.247func (r *HTTPReceiver) Stop() error {248 if r.conf.ReceiverPort == 0 {249 return nil250 }251 r.exit <- struct{}{}252 <-r.exit253 r.RateLimiter.Stop()254 expiry := time.Now().Add(5 * time.Second) // give it 5 seconds255 ctx, cancel := context.WithDeadline(context.Background(), expiry)256 defer cancel()257 if err := r.server.Shutdown(ctx); err != nil {258 return err259 }260 r.wg.Wait()261 close(r.out)262 return nil263}264func (r *HTTPReceiver) handleWithVersion(v Version, f func(Version, http.ResponseWriter, *http.Request)) http.HandlerFunc {265 return func(w http.ResponseWriter, req *http.Request) {266 if mediaType := getMediaType(req); mediaType == "application/msgpack" && (v == v01 || v == v02) {267 // msgpack is only supported for versions >= v0.3268 httpFormatError(w, v, fmt.Errorf("unsupported media type: %q", mediaType))269 return270 }271 // TODO(x): replace with http.MaxBytesReader?272 req.Body = apiutil.NewLimitedReader(req.Body, r.conf.MaxRequestBytes)273 f(v, w, req)274 }275}276var errInvalidHeaderTraceCountValue = fmt.Errorf("%q header value is not a number", headerTraceCount)277func traceCount(req *http.Request) (int64, error) {278 str := req.Header.Get(headerTraceCount)279 if str == "" {280 return 0, fmt.Errorf("HTTP header %q not found", headerTraceCount)281 }282 n, err := strconv.Atoi(str)283 if err != nil {284 return 0, errInvalidHeaderTraceCountValue285 }286 return int64(n), nil287}288const (289 // headerTraceCount is the header client implementation should fill290 // with the number of traces contained in the payload.291 headerTraceCount = "X-Datadog-Trace-Count"292 // headerContainerID specifies the name of the header which contains the ID of the293 // container where the request originated.294 headerContainerID = "Datadog-Container-ID"295 // headerLang specifies the name of the header which contains the language from296 // which the traces originate.297 headerLang = "Datadog-Meta-Lang"298 // headerLangVersion specifies the name of the header which contains the origin299 // language's version.300 headerLangVersion = "Datadog-Meta-Lang-Version"301 // headerLangInterpreter specifies the name of the HTTP header containing information302 // about the language interpreter, where applicable.303 headerLangInterpreter = "Datadog-Meta-Lang-Interpreter"304 // headerLangInterpreterVendor specifies the name of the HTTP header containing information305 // about the language interpreter vendor, where applicable.306 headerLangInterpreterVendor = "Datadog-Meta-Lang-Interpreter-Vendor"307 // headerTracerVersion specifies the name of the header which contains the version308 // of the tracer sending the payload.309 headerTracerVersion = "Datadog-Meta-Tracer-Version"310 // headerComputedTopLevel specifies that the client has marked top-level spans, when set.311 // Any non-empty value will mean 'yes'.312 headerComputedTopLevel = "Datadog-Client-Computed-Top-Level"313 // headerComputedStats specifies whether the client has computed stats so that the agent314 // doesn't have to.315 headerComputedStats = "Datadog-Client-Computed-Stats"316 // headderDroppedP0Traces contains the number of P0 trace chunks dropped by the client.317 // This value is used to adjust priority rates computed by the agent.318 headerDroppedP0Traces = "Datadog-Client-Dropped-P0-Traces"319 // headderDroppedP0Spans contains the number of P0 spans dropped by the client.320 // This value is used for metrics and could be used in the future to adjust priority rates.321 headerDroppedP0Spans = "Datadog-Client-Dropped-P0-Spans"322 // headerRatesPayloadVersion contains the version of sampling rates.323 // If both agent and client have the same version, the agent won't return rates in API response.324 headerRatesPayloadVersion = "Datadog-Rates-Payload-Version"325 // tagContainersTags specifies the name of the tag which holds key/value326 // pairs representing information about the container (Docker, EC2, etc).327 tagContainersTags = "_dd.tags.container"328)329// TagStats returns the stats and tags coinciding with the information found in header.330// For more information, check the "Datadog-Meta-*" HTTP headers defined in this file.331func (r *HTTPReceiver) TagStats(v Version, header http.Header) *info.TagStats {332 return r.tagStats(v, header)333}334func (r *HTTPReceiver) tagStats(v Version, header http.Header) *info.TagStats {335 return r.Stats.GetTagStats(info.Tags{336 Lang: header.Get(headerLang),337 LangVersion: header.Get(headerLangVersion),338 Interpreter: header.Get(headerLangInterpreter),339 LangVendor: header.Get(headerLangInterpreterVendor),340 TracerVersion: header.Get(headerTracerVersion),341 EndpointVersion: string(v),342 })343}344// decodeTracerPayload decodes the payload in http request `req`.345// - tp is the decoded payload346// - ranHook reports whether the decoder was able to run the pb.MetaHook347// - err is the first error encountered348func decodeTracerPayload(v Version, req *http.Request, ts *info.TagStats) (tp *pb.TracerPayload, ranHook bool, err error) {349 switch v {350 case v01:351 var spans []pb.Span352 if err = json.NewDecoder(req.Body).Decode(&spans); err != nil {353 return nil, false, err354 }355 return &pb.TracerPayload{356 LanguageName: ts.Lang,357 LanguageVersion: ts.LangVersion,358 ContainerID: req.Header.Get(headerContainerID),359 Chunks: traceChunksFromSpans(spans),360 TracerVersion: ts.TracerVersion,361 }, false, nil362 case v05:363 buf := getBuffer()364 defer putBuffer(buf)365 if _, err = io.Copy(buf, req.Body); err != nil {366 return nil, false, err367 }368 var traces pb.Traces369 err = traces.UnmarshalMsgDictionary(buf.Bytes())370 return &pb.TracerPayload{371 LanguageName: ts.Lang,372 LanguageVersion: ts.LangVersion,373 ContainerID: req.Header.Get(headerContainerID),374 Chunks: traceChunksFromTraces(traces),375 TracerVersion: ts.TracerVersion,376 }, true, err377 case V07:378 buf := getBuffer()379 defer putBuffer(buf)380 if _, err = io.Copy(buf, req.Body); err != nil {381 return nil, false, err382 }383 var tracerPayload pb.TracerPayload384 _, err = tracerPayload.UnmarshalMsg(buf.Bytes())385 return &tracerPayload, true, err386 default:387 var traces pb.Traces388 if ranHook, err = decodeRequest(req, &traces); err != nil {389 return nil, false, err390 }391 return &pb.TracerPayload{392 LanguageName: ts.Lang,393 LanguageVersion: ts.LangVersion,394 ContainerID: req.Header.Get(headerContainerID),395 Chunks: traceChunksFromTraces(traces),396 TracerVersion: ts.TracerVersion,397 }, ranHook, nil398 }399}400// replyOK replies to the given http.ReponseWriter w based on the endpoint version, with either status 200/OK401// or with a list of rates by service. It returns the number of bytes written along with reporting if the operation402// was successful.403func (r *HTTPReceiver) replyOK(req *http.Request, v Version, w http.ResponseWriter) (n uint64, ok bool) {404 switch v {405 case v01, v02, v03:406 return httpOK(w)407 default:408 ratesVersion := req.Header.Get(headerRatesPayloadVersion)409 return httpRateByService(ratesVersion, w, r.dynConf)410 }411}412// rateLimited reports whether n number of traces should be rejected by the API.413func (r *HTTPReceiver) rateLimited(n int64) bool {414 if n == 0 {415 return false416 }417 if r.conf.MaxMemory == 0 && r.conf.MaxCPU == 0 {418 // rate limiting is off419 return false420 }421 return !r.RateLimiter.Permits(n)422}423// StatsProcessor implementations are able to process incoming client stats.424type StatsProcessor interface {425 // ProcessStats takes a stats payload and consumes it. It is considered to be originating426 // from the given lang.427 ProcessStats(p pb.ClientStatsPayload, lang, tracerVersion string)428}429// handleStats handles incoming stats payloads.430func (r *HTTPReceiver) handleStats(w http.ResponseWriter, req *http.Request) {431 defer timing.Since("datadog.trace_agent.receiver.stats_process_ms", time.Now())432 ts := r.tagStats(V07, req.Header)433 rd := apiutil.NewLimitedReader(req.Body, r.conf.MaxRequestBytes)434 req.Header.Set("Accept", "application/msgpack")435 var in pb.ClientStatsPayload436 if err := msgp.Decode(rd, &in); err != nil {437 httpDecodingError(err, []string{"handler:stats", "codec:msgpack", "v:v0.6"}, w)438 return439 }440 metrics.Count("datadog.trace_agent.receiver.stats_payload", 1, ts.AsTags(), 1)441 metrics.Count("datadog.trace_agent.receiver.stats_bytes", rd.Count, ts.AsTags(), 1)442 metrics.Count("datadog.trace_agent.receiver.stats_buckets", int64(len(in.Stats)), ts.AsTags(), 1)443 r.statsProcessor.ProcessStats(in, req.Header.Get(headerLang), req.Header.Get(headerTracerVersion))444}445// handleTraces knows how to handle a bunch of traces446func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http.Request) {447 ts := r.tagStats(v, req.Header)448 tracen, err := traceCount(req)449 if err == nil && r.rateLimited(tracen) {450 // this payload can not be accepted451 io.Copy(ioutil.Discard, req.Body) //nolint:errcheck452 w.WriteHeader(r.rateLimiterResponse)453 r.replyOK(req, v, w)454 ts.PayloadRefused.Inc()455 return456 }457 if err == errInvalidHeaderTraceCountValue {458 log.Errorf("Failed to count traces: %s", err)459 }460 start := time.Now()461 tp, ranHook, err := decodeTracerPayload(v, req, ts)462 defer func(err error) {463 tags := append(ts.AsTags(), fmt.Sprintf("success:%v", err == nil))464 metrics.Histogram("datadog.trace_agent.receiver.serve_traces_ms", float64(time.Since(start))/float64(time.Millisecond), tags, 1)465 }(err)466 if err != nil {467 httpDecodingError(err, []string{"handler:traces", fmt.Sprintf("v:%s", v)}, w)468 switch err {469 case apiutil.ErrLimitedReaderLimitReached:470 ts.TracesDropped.PayloadTooLarge.Add(tracen)471 case io.EOF, io.ErrUnexpectedEOF, msgp.ErrShortBytes:472 ts.TracesDropped.EOF.Add(tracen)473 default:474 if err, ok := err.(net.Error); ok && err.Timeout() {475 ts.TracesDropped.Timeout.Add(tracen)476 } else {477 ts.TracesDropped.DecodingError.Add(tracen)478 }479 }480 log.Errorf("Cannot decode %s traces payload: %v", v, err)481 return482 }483 if !ranHook {484 // The decoder of this request did not run the pb.MetaHook. The user is either using485 // a deprecated endpoint or Content-Type, or, a new decoder was implemented and the486 // the hook was not added.487 log.Debug("Decoded the request without running pb.MetaHook. If this is a newly implemented endpoint, please make sure to run it!")488 if _, ok := pb.MetaHook(); ok {489 log.Warn("Received request on deprecated API endpoint or Content-Type. Performance is degraded. If you think this is an error, please contact support with this message.")490 runMetaHook(tp.Chunks)491 }492 }493 if n, ok := r.replyOK(req, v, w); ok {494 tags := append(ts.AsTags(), "endpoint:traces_"+string(v))495 metrics.Histogram("datadog.trace_agent.receiver.rate_response_bytes", float64(n), tags, 1)496 }497 ts.TracesReceived.Add(int64(len(tp.Chunks)))498 ts.TracesBytes.Add(req.Body.(*apiutil.LimitedReader).Count)499 ts.PayloadAccepted.Inc()500 if ctags := getContainerTags(r.conf.ContainerTags, tp.ContainerID); ctags != "" {501 if tp.Tags == nil {502 tp.Tags = make(map[string]string)503 }504 tp.Tags[tagContainersTags] = ctags505 }506 payload := &Payload{507 Source: ts,508 TracerPayload: tp,509 ClientComputedTopLevel: req.Header.Get(headerComputedTopLevel) != "",510 ClientComputedStats: req.Header.Get(headerComputedStats) != "",511 ClientDroppedP0s: droppedTracesFromHeader(req.Header, ts),512 }513 select {514 case r.out <- payload:515 // ok516 default:517 // channel blocked, add a goroutine to ensure we never drop518 r.wg.Add(1)519 go func() {520 metrics.Count("datadog.trace_agent.receiver.queued_send", 1, nil, 1)521 defer func() {522 r.wg.Done()523 watchdog.LogOnPanic()524 }()525 r.out <- payload526 }()527 }528}529// runMetaHook runs the pb.MetaHook on all spans from traces.530func runMetaHook(chunks []*pb.TraceChunk) {531 hook, ok := pb.MetaHook()532 if !ok {533 return534 }535 for _, chunk := range chunks {536 for _, span := range chunk.Spans {537 for k, v := range span.Meta {538 if newv := hook(k, v); newv != v {539 span.Meta[k] = newv540 }541 }542 }543 }544}545func droppedTracesFromHeader(h http.Header, ts *info.TagStats) int64 {546 var dropped int64547 if v := h.Get(headerDroppedP0Traces); v != "" {548 count, err := strconv.ParseInt(v, 10, 64)549 if err == nil {550 dropped = count551 ts.ClientDroppedP0Traces.Add(count)552 }553 }554 if v := h.Get(headerDroppedP0Spans); v != "" {555 count, err := strconv.ParseInt(v, 10, 64)556 if err == nil {557 ts.ClientDroppedP0Spans.Add(count)558 }559 }560 return dropped561}562// handleServices handle a request with a list of several services563func (r *HTTPReceiver) handleServices(v Version, w http.ResponseWriter, req *http.Request) {564 httpOK(w)565 // Do nothing, services are no longer being sent to Datadog as of July 2019566 // and are now automatically extracted from traces.567}568// loop periodically submits stats about the receiver to statsd569func (r *HTTPReceiver) loop() {570 defer close(r.exit)571 var lastLog time.Time572 accStats := info.NewReceiverStats()573 t := time.NewTicker(10 * time.Second)574 defer t.Stop()575 tw := time.NewTicker(r.conf.WatchdogInterval)576 defer tw.Stop()577 for {578 select {579 case <-r.exit:580 return581 case now := <-tw.C:582 r.watchdog(now)583 case now := <-t.C:584 metrics.Gauge("datadog.trace_agent.heartbeat", 1, nil, 1)585 metrics.Gauge("datadog.trace_agent.receiver.out_chan_fill", float64(len(r.out))/float64(cap(r.out)), nil, 1)586 // We update accStats with the new stats we collected587 accStats.Acc(r.Stats)588 // Publish the stats accumulated during the last flush589 r.Stats.Publish()590 // We reset the stats accumulated during the last 10s.591 r.Stats.Reset()592 if now.Sub(lastLog) >= time.Minute {593 // We expose the stats accumulated to expvar594 info.UpdateReceiverStats(accStats)595 accStats.LogStats()596 // We reset the stats accumulated during the last minute597 accStats.Reset()598 lastLog = now599 // Also publish rates by service (they are updated by receiver)600 rates := r.dynConf.RateByService.GetNewState("").Rates601 info.UpdateRateByService(rates)602 }603 }604 }605}606// killProcess exits the process with the given msg; replaced in tests.607var killProcess = func(format string, a ...interface{}) {608 log.Criticalf(format, a...)609 os.Exit(1)610}611// watchdog checks the trace-agent's heap and CPU usage and updates the rate limiter using a correct612// sampling rate to maintain resource usage within set thresholds. These thresholds are defined by613// the configuration MaxMemory and MaxCPU. If these values are 0, all limits are disabled and the rate614// limiter will accept everything.615func (r *HTTPReceiver) watchdog(now time.Time) {616 wi := watchdog.Info{617 Mem: watchdog.Mem(),618 CPU: watchdog.CPU(now),619 }620 rateMem := 1.0621 if r.conf.MaxMemory > 0 {622 if current, allowed := float64(wi.Mem.Alloc), r.conf.MaxMemory*1.5; current > allowed {623 // This is a safety mechanism: if the agent is using more than 1.5x max. memory, there624 // is likely a leak somewhere; we'll kill the process to avoid polluting host memory.625 metrics.Count("datadog.trace_agent.receiver.oom_kill", 1, nil, 1)626 metrics.Flush()627 log.Criticalf("Killing process. Memory threshold exceeded: %.2fM / %.2fM", current/1024/1024, allowed/1024/1024)628 killProcess("OOM")629 }630 rateMem = computeRateLimitingRate(r.conf.MaxMemory, float64(wi.Mem.Alloc), r.RateLimiter.RealRate())631 if rateMem < 1 {632 log.Warnf("Memory threshold exceeded (apm_config.max_memory: %.0f bytes): %d", r.conf.MaxMemory, wi.Mem.Alloc)633 }634 }635 rateCPU := 1.0636 if r.conf.MaxCPU > 0 {637 rateCPU = computeRateLimitingRate(r.conf.MaxCPU, wi.CPU.UserAvg, r.RateLimiter.RealRate())638 if rateCPU < 1 {639 log.Warnf("CPU threshold exceeded (apm_config.max_cpu_percent: %.0f): %.0f", r.conf.MaxCPU*100, wi.CPU.UserAvg*100)640 }641 }642 r.RateLimiter.SetTargetRate(math.Min(rateCPU, rateMem))643 stats := r.RateLimiter.Stats()644 info.UpdateRateLimiter(*stats)645 info.UpdateWatchdogInfo(wi)646 metrics.Gauge("datadog.trace_agent.heap_alloc", float64(wi.Mem.Alloc), nil, 1)647 metrics.Gauge("datadog.trace_agent.cpu_percent", wi.CPU.UserAvg*100, nil, 1)648 metrics.Gauge("datadog.trace_agent.receiver.ratelimit", stats.TargetRate, nil, 1)649}650// Languages returns the list of the languages used in the traces the agent receives.651func (r *HTTPReceiver) Languages() string {652 // We need to use this map because we can have several tags for a same language.653 langs := make(map[string]bool)654 str := []string{}655 r.Stats.RLock()656 for tags := range r.Stats.Stats {657 if _, ok := langs[tags.Lang]; !ok {658 str = append(str, tags.Lang)659 langs[tags.Lang] = true660 }661 }662 r.Stats.RUnlock()663 sort.Strings(str)664 return strings.Join(str, "|")665}666// decodeRequest decodes the payload in http request `req` into `dest`.667// It handles only v02, v03, v04 requests.668// - ranHook reports whether the decoder was able to run the pb.MetaHook669// - err is the first error encountered670func decodeRequest(req *http.Request, dest *pb.Traces) (ranHook bool, err error) {671 switch mediaType := getMediaType(req); mediaType {672 case "application/msgpack":673 buf := getBuffer()674 defer putBuffer(buf)675 _, err = io.Copy(buf, req.Body)676 if err != nil {677 return false, err678 }679 _, err = dest.UnmarshalMsg(buf.Bytes())680 return true, err681 case "application/json":682 fallthrough683 case "text/json":684 fallthrough685 case "":686 err = json.NewDecoder(req.Body).Decode(&dest)687 return false, err688 default:689 // do our best690 if err1 := json.NewDecoder(req.Body).Decode(&dest); err1 != nil {691 buf := getBuffer()692 defer putBuffer(buf)693 _, err2 := io.Copy(buf, req.Body)694 if err2 != nil {695 return false, err2696 }697 _, err2 = dest.UnmarshalMsg(buf.Bytes())698 return true, err2699 }700 return false, nil701 }702}703func traceChunksFromSpans(spans []pb.Span) []*pb.TraceChunk {704 traceChunks := []*pb.TraceChunk{}705 byID := make(map[uint64][]*pb.Span)706 for _, s := range spans {707 byID[s.TraceID] = append(byID[s.TraceID], &s)708 }709 for _, t := range byID {710 traceChunks = append(traceChunks, &pb.TraceChunk{711 Priority: int32(sampler.PriorityNone),712 Spans: t,713 })714 }715 return traceChunks716}717func traceChunksFromTraces(traces pb.Traces) []*pb.TraceChunk {718 traceChunks := make([]*pb.TraceChunk, 0, len(traces))719 for _, trace := range traces {720 traceChunks = append(traceChunks, &pb.TraceChunk{721 Priority: int32(sampler.PriorityNone),722 Spans: trace,723 })724 }725 return traceChunks726}727// getContainerTag returns container and orchestrator tags belonging to containerID. If containerID728// is empty or no tags are found, an empty string is returned.729func getContainerTags(fn func(string) ([]string, error), containerID string) string {730 if containerID == "" {731 return ""732 }733 if fn == nil {734 log.Warn("ContainerTags not configured")735 return ""736 }737 list, err := fn(containerID)738 if err != nil {739 log.Tracef("Getting container tags for ID %q: %v", containerID, err)740 return ""741 }742 log.Tracef("Getting container tags for ID %q: %v", containerID, list)743 return strings.Join(list, ",")744}745// getMediaType attempts to return the media type from the Content-Type MIME header. If it fails746// it returns the default media type "application/json".747func getMediaType(req *http.Request) string {748 mt, _, err := mime.ParseMediaType(req.Header.Get("Content-Type"))749 if err != nil {750 log.Debugf(`Error parsing media type: %v, assuming "application/json"`, err)751 return "application/json"752 }753 return mt754}...

Full Screen

Full Screen

replit.go

Source:replit.go Github

copy

Full Screen

1package main2import (3 "bytes"4 "errors"5 "fmt"6 "io/ioutil"7 "os"8 "os/exec"9 "os/signal"10 "path/filepath"11 "strings"12 "sync"13 "syscall"14 "time"15 "github.com/docopt/docopt-go"16)17type EditorFile struct {18 IsTempFile bool19 File *os.File20}21type ReplitArgs struct {22 EditorFile *EditorFile23 Dpath string24 Lang string25}26// List all files in directory27func ListDirectory(dir string) (*[]string, error) {28 dirInfo, err := os.Stat(dir)29 if err != nil {30 return nil, err31 }32 if !dirInfo.IsDir() {33 return nil, errors.New(dir + " was not a directory.")34 }35 files := []string{}36 // walk through directory and append files to a slice.37 filepath.Walk(dir, func(fpath string, info os.FileInfo, err error) error {38 if err != nil {39 return err40 }41 if !info.IsDir() {42 files = append(files, fpath)43 }44 return nil45 })46 return &files, nil47}48// Check the requested language49func ValidateLanguage(language string) error {50 if !CommandExists(language) {51 return fmt.Errorf("language %s is not in PATH", language)52 }53 return nil54}55// Create and open a temporary file56func TargetFile(file string, lang string) (*EditorFile, error) {57 if len(file) == 0 {58 tgt, err := ioutil.TempFile("/tmp", "replit")59 if err != nil {60 return nil, err61 }62 tgt.WriteString("#!/usr/bin/env " + lang + "\n")63 return &EditorFile{64 true,65 tgt,66 }, nil67 }68 conn, err := os.Open(file)69 if err != nil {70 return nil, err71 }72 return &EditorFile{73 false,74 conn,75 }, nil76}77// Check whether a command exists78func CommandExists(cmd string) bool {79 _, err := exec.LookPath(cmd)80 return err == nil81}82// Get the user's preferred visual editor83func GetEditor() (string, error) {84 visual := os.Getenv("VISUAL")85 editor := ""86 if len(visual) > 0 {87 editor = visual88 } else {89 editor = "code"90 }91 if !CommandExists(editor) {92 return "", errors.New("the command '" + editor + "' is not in PATH; is it installed and available as a command?")93 }94 return editor, nil95}96// Launch the user's visual-editor, falling back to VSCode as a default.97func LaunchEditor(editorChan chan<- *exec.Cmd, file *EditorFile) {98 editor, _ := GetEditor()99 var cmd *exec.Cmd100 if editor == "code" {101 // having to change line-position is a little irritating102 cmd = exec.Command(editor, "--goto", file.File.Name()+":2")103 } else {104 cmd = exec.Command(editor, file.File.Name())105 }106 cmd.Start()107 editorChan <- cmd108}109type FileWatcher struct {110 Done bool111 Files *[]string112}113func (watch *FileWatcher) Stop() {114 watch.Done = true115}116func (watch *FileWatcher) Stdin() *bytes.Buffer {117 byteStr := []byte(strings.Join(*watch.Files, "\n"))118 return bytes.NewBuffer(byteStr)119}120func (watch *FileWatcher) Start(tui *TUI) {121 go func() {122 for {123 if watch.Done {124 return125 }126 cmd := exec.Command("entr", "-zps", "echo 0")127 cmd.Stdin = watch.Stdin()128 cmd.Run()129 tui.actions.fileChange.Broadcast()130 }131 }()132}133// Observe file-changes134func ObserveFileChanges(args *ReplitArgs, tui *TUI) (FileWatcher, error) {135 targetFile := args.EditorFile136 dpath := args.Dpath137 var files *[]string138 if targetFile.IsTempFile {139 files = &[]string{targetFile.File.Name()}140 } else {141 var err error142 files, err = ListDirectory(dpath)143 if err != nil {144 return FileWatcher{}, err145 }146 }147 watch := FileWatcher{false, files}148 return watch, nil149}150// Read docopt arguments and return parsed, provided parameters151func ReadArgs(opts docopt.Opts) (ReplitArgs, int) {152 dir, _ := opts.String("--directory")153 if len(dir) == 0 {154 dir, _ = os.Getwd()155 }156 dpath, err := filepath.Abs(dir)157 if err != nil {158 println("replit: failed to resolve directory path")159 return ReplitArgs{}, 1160 }161 lang, err := opts.String("<lang>")162 if err != nil {163 println("replit: could not read language")164 return ReplitArgs{}, 1165 }166 // check the editor is present; ignore the value for the moment167 _, err = GetEditor()168 if err != nil {169 panic(err)170 }171 langErr := ValidateLanguage(lang)172 if langErr != nil {173 panic(langErr)174 }175 file, _ := opts.String("<file>")176 targetFile, err := TargetFile(file, lang)177 if err != nil {178 panic(err)179 }180 return ReplitArgs{181 targetFile,182 dpath,183 lang,184 }, -1185}186type LanguageState struct {187 Time time.Time188 Lock sync.Mutex189 Cmd *exec.Cmd190}191func RunLanguage(args *ReplitArgs, tui *TUI, state *LanguageState) {192 var cmd *exec.Cmd193 // kill the running process194 killProcess := func() {195 if cmd != nil {196 cmd.Process.Kill()197 cmd = nil198 }199 }200 // update stdout201 onFileChange := func() {202 now := time.Now()203 threshold := time.Second * 2204 if state.Time.Sub(now) > threshold {205 // too slow; even though it is running just kill the process and release the lock206 // so the new content can be run.207 if cmd != nil {208 cmd.Process.Kill()209 state.Lock.Unlock()210 }211 }212 state.Lock.Lock()213 // clear stdout214 stdoutViewer := tui.stdoutViewer215 stderrViewer := tui.stderrViewer216 stdoutViewer.Lock()217 stdoutViewer.Clear()218 stdoutViewer.Unlock()219 stderrViewer.Lock()220 stderrViewer.Clear()221 stderrViewer.Unlock()222 state.Time = now223 // call the language against a file224 cmd = exec.Command(args.Lang, args.EditorFile.File.Name())225 cmd.Stdout = stdoutViewer226 cmd.Stderr = stderrViewer227 startCommandTime := time.Now()228 done := false229 go func() {230 for {231 if done {232 break233 }234 time.Sleep(time.Millisecond * 25)235 tui.UpdateRunTime(time.Since(startCommandTime))236 tui.app.Draw()237 }238 }()239 cmd.Run()240 tui.UpdateRunCount()241 done = true242 tui.app.Draw()243 state.Lock.Unlock()244 }245 // run on process-kill246 attachListener(tui.actions.killProcess, killProcess)247 // run on file-change248 attachListener(tui.actions.fileChange, onFileChange)249}250// Core application251func ReplIt(opts docopt.Opts) int {252 // read and validate arguments253 args, exitCode := ReadArgs(opts)254 if exitCode >= 0 {255 return exitCode256 }257 tui := NewUI(&args)258 tui.SetTheme()259 go func(tui *TUI) {260 tui.Start()261 }(tui)262 editorChan := make(chan *exec.Cmd)263 // launch an editor asyncronously264 go LaunchEditor(editorChan, args.EditorFile)265 // start entr; read the file (and optionally a directory) and live-reload266 state := LanguageState{267 time.Now(),268 sync.Mutex{},269 nil,270 }271 fileWatcher, err := ObserveFileChanges(&args, tui)272 if err != nil {273 panic(err)274 }275 go fileWatcher.Start(tui)276 go RunLanguage(&args, tui, &state)277 // Terminate program when an exit signal is received, and tidy up termporary files and processes278 sigs := make(chan os.Signal, 1)279 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)280 <-sigs281 close(sigs)282 var doneGroup sync.WaitGroup283 doneGroup.Add(2)284 // close each channel285 // kill editor286 go func() {287 defer doneGroup.Done()288 editor := <-editorChan289 editor.Process.Kill()290 close(editorChan)291 }()292 // remove temporary file293 go func() {294 defer doneGroup.Done()295 targetFile := args.EditorFile296 if targetFile.IsTempFile {297 name := targetFile.File.Name()298 os.Remove(name)299 }300 }()301 tui.app.Stop()302 doneGroup.Wait()303 return 0304}...

Full Screen

Full Screen

tui.go

Source:tui.go Github

copy

Full Screen

1package main2import (3 "fmt"4 "sync"5 "time"6 "github.com/gdamore/tcell/v2"7 "github.com/rivo/tview"8)9type TUI struct {10 actions *TuiActions11 header *tview.TextView12 app *tview.Application13 stdoutViewer *tview.TextView14 stderrViewer *tview.TextView15 helpBar *tview.TextView16 runCountViewer *tview.TextView17 runSecondsViewer *tview.TextView18 runCount int6419 runTime int6420}21// Set initial theme overrides, so tview uses default22// system colours rather than tcell theme overrides23func (tui *TUI) SetTheme() {24 tview.Styles.PrimitiveBackgroundColor = tcell.ColorDefault25 tview.Styles.ContrastBackgroundColor = tcell.ColorDefault26}27func NewHeader(tui *TUI) *tview.TextView {28 return tview.NewTextView().29 SetDynamicColors(true).30 SetText(HEADER_TEXT)31}32type TuiActions struct {33 killProcess *sync.Cond34 fileChange *sync.Cond35}36func NewActions(tui *TUI) *TuiActions {37 return &TuiActions{38 killProcess: sync.NewCond(&sync.Mutex{}),39 fileChange: sync.NewCond(&sync.Mutex{}),40 }41}42// Attach a listener for a sync broadcast43func attachListener(cd *sync.Cond, listener func()) {44 var wg sync.WaitGroup45 wg.Add(1)46 go func() {47 wg.Done()48 cd.L.Lock()49 defer cd.L.Unlock()50 cd.Wait()51 listener()52 go attachListener(cd, listener)53 }()54 wg.Wait()55}56// TView application57func NewApplication(tui *TUI) *tview.Application {58 onInput := func(event *tcell.EventKey) *tcell.EventKey {59 if event.Rune() == 'k' {60 tui.actions.killProcess.Broadcast()61 return nil62 }63 if event.Key() == tcell.KeyEscape || event.Rune() == 'q' {64 panic("implement quit")65 }66 return event67 }68 return tview.NewApplication().69 EnableMouse(true).70 SetInputCapture(onInput)71}72// Show command output text73func NewStdoutViewer(tui *TUI) *tview.TextView {74 view := tview.NewTextView().75 SetDynamicColors(true)76 view.77 SetText(STDOUT_TEXT).Box.SetBorder(true)78 return view79}80// Show command output text81func NewStderrViewer(tui *TUI) *tview.TextView {82 view := tview.NewTextView().83 SetDynamicColors(true)84 view.85 SetText(STDERR_TEXT).Box.SetBorder(true)86 return view87}88func NewRunCount(tui *TUI) *tview.TextView {89 return tview.NewTextView().90 SetDynamicColors(true).91 SetText("run " + fmt.Sprint(tui.runCount) + " times")92}93func NewRunTime(tui *TUI) *tview.TextView {94 return tview.NewTextView().95 SetDynamicColors(true).96 SetText(fmt.Sprint(tui.runTime) + "ms")97}98// Construct all UI components99func NewUI(args *ReplitArgs) *TUI {100 tui := TUI{}101 tui.SetTheme()102 tui.actions = NewActions(&tui)103 tui.app = NewApplication(&tui)104 tui.header = NewHeader(&tui)105 tui.helpBar = NewHelpbar(&tui, args)106 tui.stdoutViewer = NewStdoutViewer(&tui)107 tui.stderrViewer = NewStderrViewer(&tui)108 tui.runCountViewer = NewRunCount(&tui)109 tui.runSecondsViewer = NewRunTime(&tui)110 return &tui111}112func (tui *TUI) UpdateRunCount() {113 tui.runCount += 1114 tui.runCountViewer.SetText("run " + fmt.Sprint(tui.runCount) + " times")115}116func (tui *TUI) UpdateRunTime(diff time.Duration) {117 tui.runTime = diff.Milliseconds()118 tui.runSecondsViewer.SetText(fmt.Sprint(tui.runTime) + "ms")119}120// Arrange TUI components into a grid121func (tui *TUI) Grid() *tview.Grid {122 return tview.NewGrid().123 SetBorders(false).124 SetRows(1, 0, 1, 1).125 SetColumns(-4, -2, -1, -1).126 AddItem(tui.header, ROW_0, COL_0, ROWSPAN_1, COLSPAN_2, MINWIDTH_0, MINHEIGHT_0, true).127 AddItem(tui.runCountViewer, ROW_0, COL_2, ROWSPAN_1, COLSPAN_1, MINWIDTH_0, MINHEIGHT_0, true).128 AddItem(tui.runSecondsViewer, ROW_0, COL_3, ROWSPAN_1, COLSPAN_1, MINWIDTH_0, MINHEIGHT_0, true).129 AddItem(tui.stdoutViewer, ROW_1, COL_0, ROWSPAN_1, COLSPAN_1, MINWIDTH_0, MINHEIGHT_0, true).130 AddItem(tui.stderrViewer, ROW_1, COL_1, ROWSPAN_1, COLSPAN_3, MINWIDTH_0, MINHEIGHT_0, true).131 AddItem(tview.NewTextView(), ROW_2, COL_0, ROWSPAN_1, COLSPAN_4, MINWIDTH_0, MINHEIGHT_0, false).132 AddItem(tui.helpBar, ROW_3, COL_0, ROWSPAN_1, COLSPAN_4, MINWIDTH_0, MINHEIGHT_0, false)133}134// Start the TUI135func (tui *TUI) Start() {136 grid := tui.Grid()137 if err := tui.app.SetRoot(grid, true).SetFocus(grid).Run(); err != nil {138 fmt.Printf("RL: Application crashed! %v", err)139 }140}141// Show help-text to help user's use Replit142func NewHelpbar(tui *TUI, args *ReplitArgs) *tview.TextView {143 return tview.NewTextView().144 SetDynamicColors(true).145 SetText("Edit [red]" + args.EditorFile.File.Name() + "[reset] & save to run with [red]" + args.Lang + "[reset]")146}...

Full Screen

Full Screen

KillProcess

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if runtime.GOOS == "windows" {4 cmd = exec.Command("C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\powershell.exe", "Start-Sleep", "1000")5 } else {6 cmd = exec.Command("sleep", "1000")7 }8 if err := cmd.Start(); err != nil {9 fmt.Println("Error starting process: ", err)10 os.Exit(1)11 }12 fmt.Println("Process id is: ", cmd.Process.Pid)13 if err := cmd.Process.Kill(); err != nil {14 fmt.Println("Error killing process: ", err)15 os.Exit(1)16 }17 fmt.Println("Process killed successfully")18}19import (20func main() {21 pid := int32(1320)22 process, err := process.NewProcess(pid)23 if err != nil {24 fmt.Println("Error getting process: ", err)25 os.Exit(1)26 }27 err = process.Kill()28 if err != nil {29 fmt.Println("Error killing process: ", err)30 os.Exit(1)31 }32 fmt.Println("Process killed successfully")33}

Full Screen

Full Screen

KillProcess

Using AI Code Generation

copy

Full Screen

1import "lang"2import "fmt"3func main() {4 fmt.Println("Killing process with PID 1")5 lang.KillProcess(1)6}7import "lang"8import "fmt"9func main() {10 fmt.Println("Killing process with PID 2")11 lang.KillProcess(2)12}

Full Screen

Full Screen

KillProcess

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 p, err := os.FindProcess(os.Getpid())4 if err != nil {5 fmt.Println(err)6 }7 err = p.Kill()8 if err != nil {9 fmt.Println(err)10 }11}

Full Screen

Full Screen

KillProcess

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println("Hello, world.")4 lang.KillProcess()5}6import "fmt"7func KillProcess() {8 fmt.Println("Process killed")9}

Full Screen

Full Screen

KillProcess

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 lang.KillProcess(1234)4}5func KillProcess(pid int) bool {6 process, err := os.FindProcess(pid)7 if err != nil {8 fmt.Println(err)9 }10 err = process.Kill()11 if err != nil {12 fmt.Println(err)13 }14 fmt.Println("Process killed successfully")15}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Gauge automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used method in

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful