...91 fields[key] = val92 }93 return fields94}95func (c *Config) logtailConn(ctx context.Context, referenceID string, since time.Time) (*websocket.Conn, error) {96 u, err := url.Parse(c.LogsTailURL.String)97 if err != nil {98 return nil, fmt.Errorf("couldn't parse cloud logs host %w", err)99 }100 u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`, referenceID, since.UnixNano())101 headers := make(http.Header)102 headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String)103 var conn *websocket.Conn104 err = retry(sleeperFunc(time.Sleep), 3, 5*time.Second, 2*time.Minute, func() (err error) {105 // We don't need to close the http body or use it for anything until we want to actually log106 // what the server returned as body when it errors out107 conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose108 return err109 })110 if err != nil {111 return nil, err112 }113 return conn, nil114}115// StreamLogsToLogger streams the logs for the configured test to the provided logger until ctx is116// Done or an error occurs.117func (c *Config) StreamLogsToLogger(118 ctx context.Context, logger logrus.FieldLogger, referenceID string, tailFrom time.Duration,119) error {120 var mconn sync.Mutex121 conn, err := c.logtailConn(ctx, referenceID, time.Now().Add(-tailFrom))122 if err != nil {123 return err124 }125 go func() {126 <-ctx.Done()127 mconn.Lock()128 defer mconn.Unlock()129 _ = conn.WriteControl(130 websocket.CloseMessage,131 websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing"),132 time.Now().Add(time.Second))133 _ = conn.Close()134 }()135 msgBuffer := make(chan []byte, 10)136 defer close(msgBuffer)137 var mostRecent int64138 go func() {139 for message := range msgBuffer {140 var m msg141 err := easyjson.Unmarshal(message, &m)142 if err != nil {143 logger.WithError(err).Errorf("couldn't unmarshal a message from the cloud: %s", string(message))144 continue145 }146 ts := m.Log(logger)147 atomic.StoreInt64(&mostRecent, ts)148 }149 }()150 for {151 _, message, err := conn.ReadMessage()152 select { // check if we should stop before continuing153 case <-ctx.Done():154 return nil155 default:156 }157 if err != nil {158 logger.WithError(err).Warn("error reading a log message from the cloud, trying to establish a fresh connection with the logs service...") //nolint:lll159 var since time.Time160 if ts := atomic.LoadInt64(&mostRecent); ts > 0 {161 // add 1ns for avoid possible repetition162 since = time.Unix(0, ts).Add(time.Nanosecond)163 } else {164 since = time.Now()165 }166 // TODO: avoid the "logical" race condition167 // The case explained:168 // * The msgBuffer consumer is slow169 // * ReadMessage is fast and adds at least one more message in the buffer170 // * An error is got in the meantime and the re-dialing procedure is tried171 // * Then the latest timestamp used will not be the real latest received172 // * because it is still waiting to be processed.173 // In the case the connection will be restored then the first message will be a duplicate.174 newconn, errd := c.logtailConn(ctx, referenceID, since)175 if errd != nil {176 // return the main error177 return err178 }179 mconn.Lock()180 conn = newconn181 mconn.Unlock()182 continue183 }184 select {185 case <-ctx.Done():186 return nil187 case msgBuffer <- message:188 }...

Full Screen

1import (2func main() {3 logtailConn, err := client.LogtailConn("myinstance")4 if err != nil {5 panic(err)6 }7 fmt.Println(logtailConn)8}

