...24 s := &StatsdSink{25 addr: addr,26 metricQueue: make(chan string, 4096),27 }28 go s.flushMetrics()29 return s, nil30}31// Close is used to stop flushing to statsd32func (s *StatsdSink) Shutdown() {33 close(s.metricQueue)34}35func (s *StatsdSink) SetGauge(key []string, val float32) {36 flatKey := s.flattenKey(key)37 s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))38}39func (s *StatsdSink) EmitKey(key []string, val float32) {40 flatKey := s.flattenKey(key)41 s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))42}43func (s *StatsdSink) IncrCounter(key []string, val float32) {44 flatKey := s.flattenKey(key)45 s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))46}47func (s *StatsdSink) AddSample(key []string, val float32) {48 flatKey := s.flattenKey(key)49 s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))50}51// Flattens the key for formatting, removes spaces52func (s *StatsdSink) flattenKey(parts []string) string {53 joined := strings.Join(parts, ".")54 return strings.Map(func(r rune) rune {55 switch r {56 case ':':57 fallthrough58 case ' ':59 return '_'60 default:61 return r62 }63 }, joined)64}65// Does a non-blocking push to the metrics queue66func (s *StatsdSink) pushMetric(m string) {67 select {68 case s.metricQueue <- m:69 default:70 }71}72// Flushes metrics73func (s *StatsdSink) flushMetrics() {74 var sock net.Conn75 var err error76 var wait <-chan time.Time77 ticker := time.NewTicker(flushInterval)78 defer ticker.Stop()79CONNECT:80 // Create a buffer81 buf := bytes.NewBuffer(nil)82 // Attempt to connect83 sock, err = net.Dial("udp", s.addr)84 if err != nil {85 log.Printf("[ERR] Error connecting to statsd! Err: %s", err)86 goto WAIT87 }...

