How to use NewNATSBus method of bus Package

Best Testkube code snippet using bus.NewNATSBus

bus_manager.go

Source:bus_manager.go Github

copy

Full Screen

1package channelling2import (3 "errors"4 "fmt"5 "log"6 "sync"7 "time"8 "github.com/nats-io/nats"9 "natsconnection"10)11const (12 BusManagerStartup = "startup"13 BusManagerOffer = "offer"14 BusManagerAnswer = "answer"15 BusManagerBye = "bye"16 BusManagerConnect = "connect"17 BusManagerDisconnect = "disconnect"18 BusManagerSession = "session"19)20// BusManager 提供了与 消息总线进行通信的API.21type BusManager interface {22 ChannellingAPIConsumer23 Start()24 Publish(subject string, v interface{}) error25 Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error26 Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error27 Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)28 BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error)29 BindSendChan(subject string, channel interface{}) error30 PrefixSubject(string) string31 CreateSink(string) Sink32}33// BusTrigger 作为序列化 后端系统总线 trigger 事件的容器.34type BusTrigger struct {35 Id string36 Name string37 From string38 Payload string `json:",omitempty"`39 Data interface{} `json:",omitempty"`40 Pipeline string `json:",omitempty"`41}42// BusSubjectTrigger 返回 trigger payloads 的消息主题名称.43func BusSubjectTrigger(prefix, suffix string) string {44 return fmt.Sprintf("%s.%s", prefix, suffix)45}46// NewBusManager 创建和初始化一个新的 BusManager, 根据 useNats开关决定是否使用 NATS.47// 目的是为了简化API, 封装与后端消息总线进行连接和收发数据的逻辑.48func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager {49 var b BusManager50 var err error51 if useNats {52 b, err = newNatsBus(apiConsumer, id, subjectPrefix)53 if err == nil {54 log.Println("NATS bus connected")55 } else {56 log.Println("Error connecting NATS bus", err)57 b = &noopBus{apiConsumer, id}58 }59 } else {60 b = &noopBus{apiConsumer, id}61 }62 return b63}64type noopBus struct {65 ChannellingAPIConsumer66 id string67}68func (bus *noopBus) Start() {69 // noop70}71func (bus *noopBus) Publish(subject string, v interface{}) error {72 return nil73}74func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {75 return nil76}77func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error {78 return nil79}80func (bus *noopBus) PrefixSubject(subject string) string {81 return subject82}83func (bus *noopBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) {84 return nil, nil85}86func (bus *noopBus) BindSendChan(subject string, channel interface{}) error {87 return nil88}89func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {90 return nil, nil91}92func (bus *noopBus) CreateSink(id string) Sink {93 return nil94}95type natsBus struct {96 ChannellingAPIConsumer97 id string98 prefix string99 ec *natsconnection.EncodedConnection100 triggerQueue chan *busQueueEntry101}102func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) {103 ec, err := natsconnection.EstablishJSONEncodedConnection(nil)104 if err != nil {105 return nil, err106 }107 if prefix == "" {108 prefix = "channelling.trigger"109 }110 // Create buffered channel for outbound NATS data.111 triggerQueue := make(chan *busQueueEntry, 50)112 return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil113}114func (bus *natsBus) Start() {115 // Start go routine to process outbount NATS publishing.116 go chPublish(bus.ec, bus.triggerQueue)117 bus.Trigger(BusManagerStartup, bus.id, "", nil, nil)118}119func (bus *natsBus) Publish(subject string, v interface{}) error {120 return bus.ec.Publish(subject, v)121}122func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {123 return bus.ec.Request(subject, v, vPtr, timeout)124}125func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) {126 trigger := &BusTrigger{127 Id: bus.id,128 Name: name,129 From: from,130 Payload: payload,131 Data: data,132 }133 if pipeline != nil {134 trigger.Pipeline = pipeline.GetID()135 }136 entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger}137 select {138 case bus.triggerQueue <- entry:139 // sent ok140 default:141 log.Println("Failed to queue NATS event - queue full?")142 err = errors.New("NATS trigger queue full")143 }144 return err145}146func (bus *natsBus) PrefixSubject(sub string) string {147 return fmt.Sprintf("%s.%s", bus.prefix, sub)148}149func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {150 return bus.ec.Subscribe(subject, cb)151}152func (bus *natsBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) {153 return bus.ec.BindRecvChan(subject, channel)154}155func (bus *natsBus) BindSendChan(subject string, channel interface{}) error {156 return bus.ec.BindSendChan(subject, channel)157}158func (bus *natsBus) CreateSink(id string) (sink Sink) {159 sink = newNatsSink(bus, id)160 return161}162type busQueueEntry struct {163 subject string164 data interface{}165}166func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntry)) {167 for {168 entry := <-channel169 err := ec.Publish(entry.subject, entry.data)170 if err != nil {171 log.Println("Failed to publish to NATS", entry.subject, err)172 }173 }174}175type natsSink struct {176 sync.RWMutex177 id string178 bm BusManager179 closed bool180 SubjectOut string181 SubjectIn string182 sub *nats.Subscription183 sendQueue chan *DataSinkOutgoing184}185func newNatsSink(bm BusManager, id string) *natsSink {186 sink := &natsSink{187 id: id,188 bm: bm,189 SubjectOut: bm.PrefixSubject(fmt.Sprintf("sink.%s.out", id)),190 SubjectIn: bm.PrefixSubject(fmt.Sprintf("sink.%s.in", id)),191 }192 sink.sendQueue = make(chan *DataSinkOutgoing, 100)193 bm.BindSendChan(sink.SubjectOut, sink.sendQueue)194 return sink195}196func (sink *natsSink) Write(outgoing *DataSinkOutgoing) (err error) {197 if sink.Enabled() {198 log.Println("Sending via NATS sink", sink.SubjectOut, outgoing)199 sink.sendQueue <- outgoing200 }201 return err202}203func (sink *natsSink) Enabled() bool {204 sink.RLock()205 defer sink.RUnlock()206 return sink.closed == false207}208func (sink *natsSink) Close() {209 sink.Lock()210 defer sink.Unlock()211 if sink.sub != nil {212 err := sink.sub.Unsubscribe()213 if err != nil {214 log.Println("Failed to unsubscribe NATS sink", err)215 } else {216 sink.sub = nil217 }218 }219 sink.closed = true220}221func (sink *natsSink) Export() *DataSink {222 return &DataSink{223 SubjectOut: sink.SubjectOut,224 SubjectIn: sink.SubjectIn,225 }226}227func (sink *natsSink) BindRecvChan(channel interface{}) (*nats.Subscription, error) {228 sink.Lock()229 defer sink.Unlock()230 if sink.sub != nil {231 sink.sub.Unsubscribe()232 sink.sub = nil233 }234 sub, err := sink.bm.BindRecvChan(sink.SubjectIn, channel)235 if err != nil {236 return nil, err237 }238 sink.sub = sub239 return sub, nil240}...

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1import (2type Service interface {3 SayHello(context.Context, *HelloRequest) (*HelloResponse, error)4}5type HelloRequest struct {6}7type HelloResponse struct {8}9type HelloService struct{}10func (s *HelloService) SayHello(ctx context.Context, req *HelloRequest) (*HelloResponse, error) {11 if req.Name == "" {12 return nil, rpc.NewError(codes.InvalidArgument, "name is required")13 }14 return &HelloResponse{Greeting: fmt.Sprintf("Hello %s", req.Name)}, nil15}16func (s *HelloService) SayHelloAgain(ctx context.Context, req *HelloRequest) (*HelloResponse, error) {17 if req.Name == "" {18 return nil, rpc.NewError(codes.InvalidArgument, "name is required")19 }20 return &HelloResponse{Greeting: fmt.Sprintf("Hello again %s", req.Name)}, nil21}22func (s *HelloService) SayHelloAgainAgain(ctx context.Context, req *HelloRequest) (*HelloResponse, error) {23 if req.Name == "" {24 return nil, rpc.NewError(codes.InvalidArgument, "name is required")25 }26 return &HelloResponse{Greeting: fmt.Sprintf("Hello again again %s", req.Name)}, nil27}28func main() {

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 cmd.Init()4 b := nats.NewBroker()5 if err := b.Init(); err != nil {6 log.Fatal(err)7 }8 if err := b.Connect(); err != nil {9 log.Fatal(err)10 }11 if err := b.Publish("go.micro.topic.foo", &broker.Message{12 Header: map[string]string{13 },14 Body: []byte("Hello World"),15 }); err != nil {16 log.Fatal(err)17 }18 _, err := b.Subscribe("go.micro.topic.foo", func(p broker.Event) error {19 fmt.Printf("[sub] Received message: %s\n", string(p.Message().Body))20 })21 if err != nil {22 log.Fatal(err)23 }24 select {}25}26import (27func main() {28 cmd.Init()29 b := nats.NewNATSStreamingBroker(30 nats.NATSClusterID("test-cluster"),

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 log.Fatal(err)5 }6 err = natsBus.Connect()7 if err != nil {8 log.Fatal(err)9 }10 err = natsBus.Publish("test", []byte("Hello World"))11 if err != nil {12 log.Fatal(err)13 }14 natsBus.Subscribe("test", func(msg *bus.Message) {15 fmt.Println("Received a message: ", string(msg.Data))16 })17 select {}18}19import (20func main() {21 if err != nil {22 log.Fatal(err)23 }24 err = natsBus.Connect()25 if err != nil {26 log.Fatal(err)27 }28 err = natsBus.Publish("test", []byte("Hello World"))29 if err != nil {30 log.Fatal(err)31 }32 natsBus.Subscribe("test", func(msg *bus.Message) {33 fmt.Println("Received a message: ", string(msg.Data))34 })35 select {}36}37import (38func main() {39 if err != nil {40 log.Fatal(err)41 }42 err = natsBus.Connect()43 if err != nil {44 log.Fatal(err)45 }46 err = natsBus.Publish("test", []byte("Hello World"))47 if err != nil {48 log.Fatal(err)49 }50 natsBus.Subscribe("test", func(msg *bus.Message) {51 fmt.Println("Received a message: ", string(msg.Data))52 })53 select {}54}55import (

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 bus := golbus.NewNATSBus()4 bus.Connect()5 bus.Publish("test", "Hello World!")6 bus.Subscribe("test", func(msg string) {7 fmt.Println("Received a message: ", msg)8 })9 bus.Disconnect()10}11import (12func main() {13 bus := golbus.NewRedisBus()14 bus.Connect()15 bus.Publish("test", "Hello World!")16 bus.Subscribe("test", func(msg string) {17 fmt.Println("Received a message: ", msg)18 })19 bus.Disconnect()20}21import (22func main() {23 bus := golbus.NewZeroMQBus()24 bus.Connect()25 bus.Publish("test", "Hello World!")26 bus.Subscribe("test", func(msg string) {27 fmt.Println("Received a message: ", msg)28 })29 bus.Disconnect()30}31import (32func main() {33 bus := golbus.NewKafkaBus()34 bus.Connect()35 bus.Publish("test", "Hello World!")36 bus.Subscribe("test", func(msg string) {37 fmt.Println("Received a message: ", msg)38 })39 bus.Disconnect()40}41import (42func main() {43 bus := golbus.NewAMQPBus()44 bus.Connect()45 bus.Publish("test", "Hello World!")46 bus.Subscribe("test", func(msg string) {47 fmt.Println("Received a message: ", msg)48 })49 bus.Disconnect()50}51import (

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1func main() {2 bus := bus.NewNATSBus()3 bus.Connect("localhost", 4222)4 bus.Subscribe("foo", func(msg *bus.Message) {5 fmt.Printf("Received a message: %s\n", string(msg.Data))6 })7 bus.Publish("foo", []byte("Hello World"))8 bus.Disconnect()9}10func main() {11 bus := bus.NewNATSStreamingBus()12 bus.Subscribe("foo", func(msg *bus.Message) {13 fmt.Printf("Received a message: %s\n", string(msg.Data))14 })15 bus.Publish("foo", []byte("Hello World"))16 bus.Disconnect()17}18func main() {19 bus := bus.NewKafkaBus()20 bus.Connect([]string{"localhost:9092"})21 bus.Subscribe("foo", func(msg *bus.Message) {22 fmt.Printf("Received a message: %s\n", string(msg.Data))23 })24 bus.Publish("foo", []byte("Hello World"))25 bus.Disconnect()26}27func main() {28 bus := bus.NewRabbitMQBus()

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1func main() {2 var (3 ctx = context.Background()4 b := bus.NewNATSBus()5 if err = b.Connect(ctx); err != nil {6 panic(err)7 }8 defer b.Close()9 s := bus.NewSubscriber(b)10 s.RegisterHandler("test", func(msg *bus.Message) error {11 fmt.Printf("received message: %s\n", msg.Data)12 })13 if err = s.Subscribe(ctx, "test"); err != nil {14 panic(err)15 }16 p := bus.NewPublisher(b)17 if err = p.Publish(ctx, "test", []byte("hello world")); err != nil {18 panic(err)19 }20 time.Sleep(time.Second)21}22Please see [examples](

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 b := bus.NewNATSBus("localhost:4222")4 b.Publish("test", []byte("Hello World"))5 b.Subscribe("test", func(m []byte) {6 fmt.Println(string(m))7 })8 b.Wait()9}10import (11func main() {12 b := bus.NewNATSBus("localhost:4222")13 b.Subscribe("test", func(m []byte) {14 fmt.Println(string(m))15 })16 b.Wait()17}18import (19func main() {20 b := bus.NewNATSBus("localhost:4222")21 b.Subscribe("test", func(m []byte) {22 fmt.Println(string(m))23 })24 b.Wait()25}26import (27func main() {28 b := bus.NewNATSBus("localhost:4222")29 b.Publish("test", []byte("Hello World"))30 b.Subscribe("test", func(m []byte) {31 fmt.Println(string(m))32 })33 b.Wait()34}35import (36func main() {37 b := bus.NewNATSBus("localhost

Full Screen

Full Screen

NewNATSBus

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 bus.Publish("test", []byte("Hello World"))4 bus.Subscribe("test", func(msg []byte) {5 fmt.Println(string(msg))6 })7 bus.Close()8}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testkube automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful