Best Venom code snippet using mqtt.New
vernemq_test.go
Source:vernemq_test.go
...14)15func Test_readTestData(t *testing.T) {16 assert.NotNil(t, metricsV1101MQTTv5)17}18func TestNew(t *testing.T) {19 assert.Implements(t, (*module.Module)(nil), New())20}21func TestVerneMQ_Init(t *testing.T) {22 verneMQ := prepareVerneMQ()23 assert.True(t, verneMQ.Init())24}25func TestVerneMQ_Init_ReturnsFalseIfURLIsNotSet(t *testing.T) {26 verneMQ := prepareVerneMQ()27 verneMQ.URL = ""28 assert.False(t, verneMQ.Init())29}30func TestVerneMQ_Init_ReturnsFalseIfClientWrongTLSCA(t *testing.T) {31 verneMQ := prepareVerneMQ()32 verneMQ.Client.TLSConfig.TLSCA = "testdata/tls"33 assert.False(t, verneMQ.Init())34}35func TestVerneMQ_Check(t *testing.T) {36 verneMQ, srv := prepareClientServerV1101(t)37 defer srv.Close()38 assert.True(t, verneMQ.Check())39}40func TestVerneMQ_Check_ReturnsFalseIfConnectionRefused(t *testing.T) {41 verneMQ := prepareVerneMQ()42 require.True(t, verneMQ.Init())43 assert.False(t, verneMQ.Check())44}45func TestVerneMQ_Check_ReturnsFalseIfMetricsAreNotVerneMQ(t *testing.T) {46 verneMQ, srv := prepareClientServerNotVerneMQ(t)47 defer srv.Close()48 require.True(t, verneMQ.Init())49 assert.False(t, verneMQ.Check())50}51func TestVerneMQ_Charts(t *testing.T) {52 assert.NotNil(t, New().Charts())53}54func TestVerneMQ_Cleanup(t *testing.T) {55 assert.NotPanics(t, New().Cleanup)56}57func TestVerneMQ_Collect(t *testing.T) {58 verneMQ, srv := prepareClientServerV1101(t)59 defer srv.Close()60 collected := verneMQ.Collect()61 assert.Equal(t, v1101ExpectedMetrics, collected)62 testCharts(t, verneMQ, collected)63}64func TestVerneMQ_Collect_ReturnsNilIfConnectionRefused(t *testing.T) {65 verneMQ := prepareVerneMQ()66 require.True(t, verneMQ.Init())67 assert.Nil(t, verneMQ.Collect())68}69func TestVerneMQ_Collect_ReturnsNilIfMetricsAreNotVerneMQ(t *testing.T) {70 verneMQ, srv := prepareClientServerNotVerneMQ(t)71 defer srv.Close()72 assert.Nil(t, verneMQ.Collect())73}74func TestVerneMQ_Collect_ReturnsNilIfReceiveInvalidResponse(t *testing.T) {75 verneMQ, ts := prepareClientServerInvalid(t)76 defer ts.Close()77 assert.Nil(t, verneMQ.Collect())78}79func TestVerneMQ_Collect_ReturnsNilIfReceiveResponse404(t *testing.T) {80 verneMQ, ts := prepareClientServerResponse404(t)81 defer ts.Close()82 assert.Nil(t, verneMQ.Collect())83}84func testCharts(t *testing.T, verneMQ *VerneMQ, collected map[string]int64) {85 ensureCollectedHasAllChartsDimsVarsIDs(t, verneMQ, collected)86}87func ensureCollectedHasAllChartsDimsVarsIDs(t *testing.T, verneMQ *VerneMQ, collected map[string]int64) {88 for _, chart := range *verneMQ.Charts() {89 for _, dim := range chart.Dims {90 _, ok := collected[dim.ID]91 assert.Truef(t, ok, "collected metrics has no data for dim '%s' chart '%s'", dim.ID, chart.ID)92 }93 for _, v := range chart.Vars {94 _, ok := collected[v.ID]95 assert.Truef(t, ok, "collected metrics has no data for var '%s' chart '%s'", v.ID, chart.ID)96 }97 }98}99func prepareVerneMQ() *VerneMQ {100 verneMQ := New()101 verneMQ.URL = "http://127.0.0.1:38001/metrics"102 return verneMQ103}104func prepareClientServerV1101(t *testing.T) (*VerneMQ, *httptest.Server) {105 t.Helper()106 ts := httptest.NewServer(http.HandlerFunc(107 func(w http.ResponseWriter, r *http.Request) {108 _, _ = w.Write(metricsV1101MQTTv5)109 }))110 verneMQ := New()111 verneMQ.URL = ts.URL112 require.True(t, verneMQ.Init())113 return verneMQ, ts114}115func prepareClientServerNotVerneMQ(t *testing.T) (*VerneMQ, *httptest.Server) {116 t.Helper()117 ts := httptest.NewServer(http.HandlerFunc(118 func(w http.ResponseWriter, r *http.Request) {119 _, _ = w.Write(invalidMetrics)120 }))121 verneMQ := New()122 verneMQ.URL = ts.URL123 require.True(t, verneMQ.Init())124 return verneMQ, ts125}126func prepareClientServerInvalid(t *testing.T) (*VerneMQ, *httptest.Server) {127 t.Helper()128 ts := httptest.NewServer(http.HandlerFunc(129 func(w http.ResponseWriter, r *http.Request) {130 _, _ = w.Write([]byte("hello and\n goodbye"))131 }))132 verneMQ := New()133 verneMQ.URL = ts.URL134 require.True(t, verneMQ.Init())135 return verneMQ, ts136}137func prepareClientServerResponse404(t *testing.T) (*VerneMQ, *httptest.Server) {138 t.Helper()139 ts := httptest.NewServer(http.HandlerFunc(140 func(w http.ResponseWriter, r *http.Request) {141 w.WriteHeader(http.StatusNotFound)142 }))143 verneMQ := New()144 verneMQ.URL = ts.URL145 require.True(t, verneMQ.Init())146 return verneMQ, ts147}148var v1101ExpectedMetrics = map[string]int64{149 "bytes_received": 36796908,150 "bytes_sent": 23361693,151 "client_keepalive_expired": 1,152 "cluster_bytes_dropped": 0,153 "cluster_bytes_received": 0,154 "cluster_bytes_sent": 0,155 "mqtt_auth_received": 0,156 "mqtt_auth_received_continue_authentication": 0,157 "mqtt_auth_received_reauthenticate": 0,...
mqtt.go
Source:mqtt.go
...71 return72}73func (mqttCol *mqttCollector) init() (err error) {74 if mqttCol.config.Active {75 opts := mqtt.NewClientOptions().AddBroker("tcp://" + mqttCol.config.Host + ":" +76 strconv.Itoa(mqttCol.config.Port)).SetClientID(mqttCol.name)77 opts.SetDefaultPublishHandler(mqttCol.newIncomingMQTTMessage)78 mqttCol.client = mqtt.NewClient(opts)79 if token := mqttCol.client.Connect(); token.Wait() && token.Error() != nil {80 err = errors.New("MQTTInitError")81 return82 }83 for i := 0; i < 3; i++ {84 go mqttCol.deliverMsgs()85 }86 // cli.numDeliverer = 387 }88 return89}90func (mqttCol *mqttCollector) close() (err error) {91 if mqttCol.config.Active {92 mqttCol.logInfo.Println("Disconnecting MQTT client")93 mqttCol.client.Disconnect(250)94 }95 err = nil96 return97}98// newIncomingMQTTMessage adds message to channel for incoming messages99func (mqttCol *mqttCollector) newIncomingMQTTMessage(client mqtt.Client, msg mqtt.Message) {100 var mqttMsg schemas.MQTTMessage101 mqttMsg.Content = msg.Payload()102 mqttMsg.Topic = msg.Topic()103 mqttCol.msgIn <- mqttMsg104}105// subscribe subscribes to specified topics106func (mqttCol *mqttCollector) subscribe(mq *AgentMQTT, topic string, qos int) (err error) {107 if !mqttCol.config.Active {108 return109 }110 mqttCol.mutex.Lock()111 ag, ok := mqttCol.subscription[topic]112 mqttCol.mutex.Unlock()113 if ok {114 subscribed := false115 for i := range ag {116 if ag[i].agentID == mq.agentID {117 subscribed = true118 break119 }120 }121 if !subscribed {122 ag = append(ag, mq)123 mqttCol.mutex.Lock()124 mqttCol.subscription[topic] = ag125 mqttCol.mutex.Unlock()126 }127 } else {128 mqttCol.mutex.Lock()129 token := mqttCol.client.Subscribe(topic, byte(qos), nil)130 mqttCol.mutex.Unlock()131 if token.Wait() && token.Error() != nil {132 err = token.Error()133 return134 }135 ag = make([]*AgentMQTT, 0)136 ag = append(ag, mq)137 mqttCol.mutex.Lock()138 mqttCol.subscription[topic] = ag139 mqttCol.mutex.Unlock()140 }141 // cli.mutex.Lock()142 // numDel := len(cli.subscription) / 25143 // if numDel > cli.numDeliverer {144 // for i := 0; i < numDel-cli.numDeliverer; i++ {145 // go cli.deliverMsgs()146 // }147 // cli.numDeliverer = numDel148 // }149 // cli.mutex.Unlock()150 return151}152// unsubscribe to a topic153func (mqttCol *mqttCollector) unsubscribe(mq *AgentMQTT, topic string) (err error) {154 if !mqttCol.config.Active {155 return156 }157 mqttCol.mutex.Lock()158 ag, ok := mqttCol.subscription[topic]159 mqttCol.mutex.Unlock()160 if !ok {161 return162 }163 index := -1164 for i := range ag {165 if mq.agentID == ag[i].agentID {166 index = i167 break168 }169 }170 if index == -1 {171 return172 }173 if index == 0 && len(ag) == 1 {174 // agent is the only one who has subscribed -> unsubscribe175 delete(mqttCol.subscription, topic)176 token := mqttCol.client.Unsubscribe(topic)177 if token.Wait() && token.Error() != nil {178 err = token.Error()179 return180 }181 } else {182 // remove agent from list of subscribed agents183 ag[index] = ag[len(ag)-1]184 ag[len(ag)-1] = nil185 ag = ag[:len(ag)-1]186 mqttCol.mutex.Lock()187 mqttCol.subscription[topic] = ag188 mqttCol.mutex.Unlock()189 }190 return191}192// publish sends a message193func (mqttCol *mqttCollector) publish(msg schemas.MQTTMessage, qos int) (err error) {194 if mqttCol.config.Active {195 token := mqttCol.client.Publish(msg.Topic, byte(qos), false, msg.Content)196 token.Wait()197 }198 return199}200// deliverMsg delivers incoming messages to agents according to their topic201func (mqttCol *mqttCollector) deliverMsgs() {202 var msg schemas.MQTTMessage203 for {204 msg = <-mqttCol.msgIn205 mqttCol.mutex.Lock()206 ag, ok := mqttCol.subscription[msg.Topic]207 mqttCol.mutex.Unlock()208 if ok {209 for i := range ag {210 ag[i].newIncomingMQTTMessage(msg)211 }212 }213 }214}215// AgentMQTT provides functions to subscribe and publish via mqtt216type AgentMQTT struct {217 collector *mqttCollector218 mutex *sync.Mutex // mutex for message inbox map219 subTopic map[string]interface{} // subscribed topics220 msgInTopic map[string]chan schemas.MQTTMessage // message inbox for messages with specified topic221 msgIn chan schemas.MQTTMessage // mqtt message inbox222 agentID int223 logger *client.AgentLogger224 logError *log.Logger225 logInfo *log.Logger226 active bool227}228// newAgentMQTT returns a new pubsub connector of type mqtt229func (mqttCol *mqttCollector) newAgentMQTT(agentID int, cmaplog *client.AgentLogger,230 logErr *log.Logger, logInf *log.Logger) (mq *AgentMQTT) {231 mq = &AgentMQTT{232 collector: mqttCol,233 mutex: &sync.Mutex{},234 agentID: agentID,235 logger: cmaplog,236 logError: logErr,237 logInfo: logInf,238 active: mqttCol.config.Active,239 }240 mq.subTopic = make(map[string]interface{})241 mq.msgInTopic = make(map[string]chan schemas.MQTTMessage)242 mq.msgIn = make(chan schemas.MQTTMessage)243 return244}245// close closes the mqtt246func (mq *AgentMQTT) close() {247 for t := range mq.subTopic {248 mq.Unsubscribe(t)249 }250 mq.mutex.Lock()251 mq.logInfo.Println("Closing MQTT of agent ", mq.agentID)252 mq.active = false253 mq.mutex.Unlock()254}255// Subscribe subscribes to a topic256func (mq *AgentMQTT) Subscribe(topic string, qos int) (err error) {257 mq.mutex.Lock()258 if !mq.active {259 mq.mutex.Unlock()260 err = errors.New("mqtt not active")261 return262 }263 _, ok := mq.subTopic[topic]264 mq.mutex.Unlock()265 if ok {266 return267 }268 mq.mutex.Lock()269 mq.subTopic[topic] = nil270 mq.mutex.Unlock()271 err = mq.collector.subscribe(mq, topic, qos)272 return273}274// Unsubscribe unsubscribes a topic275func (mq *AgentMQTT) Unsubscribe(topic string) (err error) {276 mq.mutex.Lock()277 if !mq.active {278 mq.mutex.Unlock()279 err = errors.New("mqtt not active")280 return281 }282 _, ok := mq.subTopic[topic]283 mq.mutex.Unlock()284 if !ok {285 return286 }287 mq.mutex.Lock()288 delete(mq.subTopic, topic)289 mq.mutex.Unlock()290 err = mq.collector.unsubscribe(mq, topic)291 return292}293// SendMessage sends a message294func (mq *AgentMQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {295 mq.mutex.Lock()296 if !mq.active {297 mq.mutex.Unlock()298 err = errors.New("mqtt not active")299 return300 }301 mq.mutex.Unlock()302 err = mq.collector.publish(msg, qos)303 if err != nil {304 return305 }306 err = mq.logger.NewLog("msg", "MQTT publish", msg.String())307 return308}309// NewMessage returns a new initiaized message310func (mq *AgentMQTT) NewMessage(topic string, content []byte) (msg schemas.MQTTMessage, err error) {311 msg.Topic = topic312 msg.Content = content313 err = nil314 return315}316// RecvMessages retrieves all messages since last call of this function317func (mq *AgentMQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err error) {318 mq.mutex.Lock()319 if !mq.active {320 mq.mutex.Unlock()321 err = errors.New("mqtt not active")322 return323 }324 mq.mutex.Unlock()325 num = 0326 err = nil327 for {328 select {329 case msgtemp := <-mq.msgIn:330 msgs = append(msgs, msgtemp)331 num++332 default:333 return334 }335 }336}337// RecvMessageWait retrieves next message and blocks if no message is available338func (mq *AgentMQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {339 mq.mutex.Lock()340 if !mq.active {341 mq.mutex.Unlock()342 err = errors.New("mqtt not active")343 return344 }345 mq.mutex.Unlock()346 err = nil347 msg = <-mq.msgIn348 return349}350// newIncomingMQTTMessage adds message to channel for incoming messages351func (mq *AgentMQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {352 mq.mutex.Lock()353 if !mq.active {354 mq.mutex.Unlock()355 return356 }357 mq.mutex.Unlock()358 mq.logger.NewLog("msg", "MQTT receive", msg.String())359 mq.mutex.Lock()360 inbox, ok := mq.msgInTopic[msg.Topic]361 mq.mutex.Unlock()362 if ok {363 inbox <- msg364 } else {365 mq.msgIn <- msg366 }367}368func (mq *AgentMQTT) registerTopicChannel(topic string,369 topicChan chan schemas.MQTTMessage) (err error) {370 mq.mutex.Lock()371 if !mq.active {372 mq.mutex.Unlock()373 err = errors.New("mqtt not active")374 return375 }376 if _, ok := mq.msgInTopic[topic]; !ok {377 mq.msgInTopic[topic] = topicChan378 } else {379 err = errors.New("topic is already handled")380 }381 mq.mutex.Unlock()382 return383}384func (mq *AgentMQTT) deregisterTopicChannel(topic string) (err error) {385 mq.mutex.Lock()386 if _, ok := mq.msgInTopic[topic]; ok {387 delete(mq.msgInTopic, topic)388 } else {389 err = errors.New("topic is not handled")390 }391 mq.mutex.Unlock()392 return393}...
mqtt_client.go
Source:mqtt_client.go
...79func Connect() {80 //create a ClientOptions struct setting the broker address, clientid, turn81 //off trace output and set the default message handler82 // This should be replaced with env variables83 opts := MQTT.NewClientOptions().AddBroker("tcp://mosquitto:1883")84 opts.SetClientID("Home-Api")85 opts.SetDefaultPublishHandler(f)86 mqttToken, err := dockersecret.ReadSecret("token_homeapi")87 if err != nil {88 logger.Fatalln("The mqtt token cannot be read")89 return90 }91 opts.SetUsername(mqttToken)92 opts.SetPassword("_")93 opts.ConnectRetry = true94 opts.AutoReconnect = true95 opts.OnConnect = OnConnect96 opts.OnConnectionLost = func(cl MQTT.Client, err error) {97 logger.Println("Connection lost to MQTT broker")98 }99 opts.OnReconnecting = func(MQTT.Client, *MQTT.ClientOptions) {100 logger.Println("Attempting to reconnect to the MQTT broker")101 }102 //create and start a client using the above ClientOptions103 mqttClient = MQTT.NewClient(opts)104 if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {105 logger.Panicf("The mqtt client could not connect. Error: %s\n", token.Error())106 panic(token.Error())107 }108}109func OnConnect(c MQTT.Client) {110 logger.Println("Connected to the MQTT broker")111 //subscribe to the topic /go-mqtt/sample and request messages to be delivered112 //at a maximum qos of zero, wait for the receipt to confirm the subscription113 if token := mqttClient.Subscribe("v1/+/+/+/value", 0, onValue); token.Wait() && token.Error() != nil {114 logger.Println(token.Error())115 os.Exit(1)116 }117 if token := mqttClient.Subscribe("v1/+/+/+/state", 2, onState); token.Wait() && token.Error() != nil {118 logger.Println(token.Error())119 os.Exit(1)120 }121 if token := mqttClient.Subscribe("v1/+/+/+/aux/+", 2, onAux); token.Wait() && token.Error() != nil {122 logger.Println(token.Error())123 os.Exit(1)124 }125 if token := mqttClient.Subscribe("v1/+/+/status", 2, onStatus); token.Wait() && token.Error() != nil {126 logger.Println(token.Error())127 os.Exit(1)128 }129}130// GetStatus : Gets the last status of the requested sensor131func GetStatus(deviceID string) (bool, error) {132 if device, ok := sensors[deviceID]; ok {133 return device.Status, nil134 }135 return false, errors.New("The status is not available")136}137// SetState : Sets the state of the requested sensor138func SetState(locationID, deviceID, sensorID string, newState bool) error {139 if mqttClient == nil || !mqttClient.IsConnected() {140 return errors.New("The mqtt client is not connected")141 }142 topic := "v1/" + locationID + "/" + deviceID + "/" + sensorID + "/setState"143 token := mqttClient.Publish(topic, 2, false, strconv.FormatBool(newState))144 token.Wait()145 return token.Error()146}147// SetAux : Sets an aux value for the requested sensor148func SetAux(locationID, deviceID, sensorID, endpoint, value string, retain bool) error {149 if mqttClient == nil || !mqttClient.IsConnected() {150 return errors.New("The mqtt client is not connected")151 }152 topic := "v1/" + locationID + "/" + deviceID + "/" + sensorID + "/aux/" + endpoint153 token := mqttClient.Publish(topic, 2, retain, value)154 token.Wait()155 return token.Error()156}157// GetValue : Gets the last value of the requested sensor158func GetValue(deviceID, sensorID string) (float32, error) {159 if device, ok := sensors[deviceID]; ok {160 if sensor, ok := device.Sensors[sensorID]; ok {161 return sensor.Value, nil162 }163 }164 return 0.0, errors.New("The value is not available")165}166// GetState : Gets the last state of the requested sensor167func GetState(deviceID, sensorID string) (bool, error) {168 if device, ok := sensors[deviceID]; ok {169 if sensor, ok := device.Sensors[sensorID]; ok {170 return sensor.State, nil171 }172 }173 return false, errors.New("The state is not available")174}175// GetAux : Gets the last aux values of the requested sensor176func GetAux(deviceID, sensorID string) (map[string]string, error) {177 if device, ok := sensors[deviceID]; ok {178 if sensor, ok := device.Sensors[sensorID]; ok {179 if sensor.AuxValues != nil {180 return sensor.AuxValues, nil181 }182 }183 }184 return nil, errors.New("The state is not available")185}186type deviceValues struct {187 Status bool188 Sensors map[string]sensorValues189}190type sensorValues struct {191 Status bool192 State bool193 Value float32194 AuxValues map[string]string195}...
New
Using AI Code Generation
1import (2func main() {3 opts := mqtt.NewClientOptions()4 opts.SetClientID("go-simple")5 opts.SetUsername("test")6 opts.SetPassword("test")7 c := mqtt.NewClient(opts)8 if token := c.Connect(); token.Wait() && token.Error() != nil {9 panic(token.Error())10 }11 fmt.Println("Connected")12 if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {13 fmt.Println(token.Error())14 os.Exit(1)15 }16}17import (18func main() {19 opts := mqtt.NewClientOptions()20 opts.SetClientID("go-simple")21 opts.SetUsername("test")22 opts.SetPassword("test")23 c := mqtt.NewClient(opts)24 if token := c.Connect(); token.Wait() && token.Error() != nil {25 panic(token.Error())26 }27 fmt.Println("Connected")28 if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {29 fmt.Println(token.Error())30 os.Exit(1)31 }32}33./2.go:16: cannot use c (type mqtt.Client) as type mqtt.Client in argument to c.Subscribe:34 mqtt.Client does not implement mqtt.Client (wrong type for Subscribe method)35 have Subscribe("github.com/eclipse/paho.mqtt.golang".string, byte, func("github.com/eclipse/paho.mqtt.golang".Client, "github.com/eclipse/paho.mqtt.golang".Message))36 want Subscribe("github.com/eclipse/paho.mqtt.golang".string, byte, func("github.com/eclipse/paho.mqtt.golang".Client, "github.com/eclipse/paho.mqtt.golang".Message) error)
New
Using AI Code Generation
1import (2func main() {3 var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {4 fmt.Printf("TOPIC: %s\n", msg.Topic())5 fmt.Printf("MSG: %s\n", msg.Payload())6 }7 opts.SetClientID("go-simple")8 opts.SetDefaultPublishHandler(f)9 c := mqtt.NewClient(opts)10 if token := c.Connect(); token.Wait() && token.Error() != nil {11 panic(token.Error())12 }13 if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {14 fmt.Println(token.Error())15 os.Exit(1)16 }17 for i := 0; i < 5; i++ {18 text := fmt.Sprintf("this is msg #%d!", i)19 token := c.Publish("go-mqtt/sample", 0, false, text)20 token.Wait()21 }22 if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {23 fmt.Println(token.Error())24 os.Exit(1)25 }26 c.Disconnect(250)27}28import (29func main() {30 var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {31 fmt.Printf("TOPIC: %s\n", msg.Topic())32 fmt.Printf("MSG:
New
Using AI Code Generation
1import (2func main() {3 opts.SetClientID("go-simple")4 opts.SetDefaultPublishHandler(f)5 c := mqtt.NewClient(opts)6 if token := c.Connect(); token.Wait() && token.Error() != nil {7 panic(token.Error())8 }9 if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {10 fmt.Println(token.Error())11 os.Exit(1)12 }13 for {14 token := c.Publish("go-mqtt/sample", 0, false, text)15 token.Wait()16 time.Sleep(1 * time.Second)17 }18 c.Disconnect(250)19}20func f(client mqtt.Client, msg mqtt.Message) {21 fmt.Printf("Topic: %s\n", msg.Topic())22 fmt.Printf("Message: %s\n", msg.Payload())23}
New
Using AI Code Generation
1import (2func main() {3 opts.SetClientID("myclientid")4 opts.SetDefaultPublishHandler(f)5 c := mqtt.NewClient(opts)6 if token := c.Connect(); token.Wait() && token.Error() != nil {7 panic(token.Error())8 }9 if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {10 fmt.Println(token.Error())11 os.Exit(1)12 }13}14import (15func main() {16 opts.SetClientID("myclientid")17 opts.SetDefaultPublishHandler(f)18 c := mqtt.NewClient(opts)19 if token := c.Connect(); token.Wait() && token.Error() != nil {20 panic(token.Error())21 }22 if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {23 fmt.Println(token.Error())24 os.Exit(1)25 }26}27import (28func main() {29 opts.SetClientID("myclientid")30 opts.SetDefaultPublishHandler(f)31 c := mqtt.NewClient(opts)32 if token := c.Connect(); token.Wait() && token.Error() != nil {33 panic(token.Error())34 }35 if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {36 fmt.Println(token.Error())37 os.Exit(1)38 }39}40import (41func main() {42 opts.SetClientID("myclientid")43 opts.SetDefaultPublishHandler(f)44 c := mqtt.NewClient(opts)
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!