How to use createAMQPSession method of amqp Package

Best Venom code snippet using amqp.createAMQPSession

amqpconnection.go

Source:amqpconnection.go Github

copy

Full Screen

1package servicebus2import (3 "context"4 "fmt"5 "github.com/twinj/uuid"6 "net/http"7 "net/url"8 "os"9 "strings"10 log "github.com/sirupsen/logrus"11 "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"12 "github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2017-04-01/servicebus"13 "github.com/Azure/go-autorest/autorest/azure"14 "github.com/Azure/go-autorest/autorest/to"15 "github.com/lawrencegripper/ion/internal/app/dispatcher/helpers"16 "github.com/lawrencegripper/ion/internal/pkg/types"17 "pack.ag/amqp"18)19const serviceBusRootKeyName = "RootManageSharedAccessKey"20// AmqpConnection provides a connection to service bus and methods for creating required subscriptions and topics21type AmqpConnection struct {22 subsClient *servicebus.SubscriptionsClient23 Endpoint string24 SubscriptionName string25 SubscriptionAmqpPath string26 TopicName string27 AccessKeys servicebus.AccessKeys28 AMQPConnectionString string29 Session *amqp.Session30 Receiver *amqp.Receiver31 ManagementReceiver *amqp.Receiver32 ManagementSender *amqp.Sender33 getSubscription func() (servicebus.SBSubscription, error)34}35// MessageCountDetails is a mirror of the SB SDK object but without pointers and things we don't need so logrus can36// log the numbers correctly37type MessageCountDetails struct {38 // ActiveMessageCount - Number of active messages in the queue, topic, or subscription.39 ActiveMessageCount int6440 // DeadLetterMessageCount - Number of messages that are dead lettered.41 DeadLetterMessageCount int6442}43// GetQueueDepth returns the current length of the sb queue44func (l *AmqpConnection) GetQueueDepth() (MessageCountDetails, error) {45 sub, err := l.getSubscription()46 if err != nil || sub.MessageCount == nil {47 return MessageCountDetails{}, err48 }49 details := sub.CountDetails50 detailsPointerless := MessageCountDetails{}51 if details.ActiveMessageCount != nil {52 detailsPointerless.ActiveMessageCount = *details.ActiveMessageCount53 }54 if details.DeadLetterMessageCount != nil {55 detailsPointerless.DeadLetterMessageCount = *details.DeadLetterMessageCount56 }57 return detailsPointerless, nil58}59// Todo: Reconsider approach to error handling in this code.60// Move to returning err and panicing in the caller if listener creation fails.61// NewAmqpConnection initilises a servicebus lister from configuration62func NewAmqpConnection(ctx context.Context, config *types.Configuration) *AmqpConnection {63 if config == nil {64 log.Panic("Nil config not allowed")65 }66 if config.SubscribesToEvent == "" {67 log.Panic("Empty subscribesToEvent not allowed")68 }69 if config.ModuleName == "" {70 log.Panic("Empty module name not allowed")71 }72 if config.Job == nil {73 log.Panic("Job config required")74 }75 //Todo: close connection to amqp when context is cancelled/done76 listener := AmqpConnection{}77 auth := helpers.GetAzureADAuthorizer(config, azure.PublicCloud.ResourceManagerEndpoint)78 subsClient := servicebus.NewSubscriptionsClient(config.SubscriptionID)79 subsClient.Authorizer = auth80 topicsClient := servicebus.NewTopicsClient(config.SubscriptionID)81 topicsClient.Authorizer = auth82 namespaceClient := servicebus.NewNamespacesClient(config.SubscriptionID)83 namespaceClient.Authorizer = auth84 groupsClient := resources.NewGroupsClient(config.SubscriptionID)85 groupsClient.Authorizer = auth86 listener.subsClient = &subsClient87 // Check if resource group exists88 _, err := groupsClient.Get(ctx, config.ResourceGroup)89 if err != nil {90 log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting resource group: %v", err)91 }92 // Check namespace exists93 namespace, err := namespaceClient.Get(ctx, config.ResourceGroup, config.ServiceBusNamespace)94 if err != nil {95 log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting servicebus namespace: %v", err)96 }97 listener.Endpoint = *namespace.ServiceBusEndpoint98 keys, err := namespaceClient.ListKeys(ctx, config.ResourceGroup, config.ServiceBusNamespace, serviceBusRootKeyName)99 if err != nil {100 log.WithFields(log.Fields{101 "config": types.RedactConfigSecrets(config),102 "response": keys,103 }).WithError(err).Panicf("Failed getting servicebus namespace")104 }105 listener.AccessKeys = keys106 listener.AMQPConnectionString = getAmqpConnectionString(*keys.KeyName, *keys.SecondaryKey, *namespace.Name)107 // Check Topic to listen on. Create a topic if missing108 topic := createTopic(ctx, topicsClient, config, config.SubscribesToEvent)109 listener.TopicName = strings.ToLower(*topic.Name)110 eventsPublished := strings.Split(config.EventsPublished, ",")111 for _, eventName := range eventsPublished {112 // Check topic to publish to. Create is missing113 createTopic(ctx, topicsClient, config, eventName)114 }115 // Check subscription to listen on. Create if missing116 subName := getSubscriptionName(config.SubscribesToEvent, config.ModuleName)117 sub, err := subsClient.Get(118 ctx,119 config.ResourceGroup,120 config.ServiceBusNamespace,121 config.SubscribesToEvent,122 subName,123 )124 listener.getSubscription = func() (servicebus.SBSubscription, error) {125 return subsClient.Get(126 ctx,127 config.ResourceGroup,128 config.ServiceBusNamespace,129 config.SubscribesToEvent,130 subName,131 )132 }133 if err != nil && sub.Response.StatusCode == http.StatusNotFound {134 log.WithField("config", types.RedactConfigSecrets(config)).Debugf("subscription %v doesn't exist.. creating", subName)135 deliveryCount := config.Job.RetryCount + 1136 if deliveryCount < 1 {137 log.Error("retryCount must be greater than or equal to 0")138 }139 subDef := servicebus.SBSubscription{140 SBSubscriptionProperties: &servicebus.SBSubscriptionProperties{141 MaxDeliveryCount: to.Int32Ptr(int32(deliveryCount)),142 },143 }144 sub, err = subsClient.CreateOrUpdate(145 ctx,146 config.ResourceGroup,147 config.ServiceBusNamespace,148 config.SubscribesToEvent,149 subName,150 subDef,151 )152 if err != nil {153 log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed creating subscription: %v", err)154 }155 } else if err != nil {156 log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting subscription: %v", err)157 }158 listener.SubscriptionName = *sub.Name159 listener.SubscriptionAmqpPath = getSubscriptionAmqpPath(config.SubscribesToEvent, config.ModuleName)160 listener.Session = createAmqpSession(&listener)161 listener.Receiver = createAmqpListener(&listener)162 listener.ManagementSender, listener.ManagementReceiver, err = listener.createAmqpSBManagementChannels(listener.TopicName, config.ModuleName)163 if err != nil {164 log.WithError(err).Error("failed to create management sender, without this renewal of message locks will fail")165 }166 return &listener167}168func swapIndex(indexOne, indexTwo int, array *[16]byte) {169 v1 := array[indexOne]170 array[indexOne] = array[indexTwo]171 array[indexTwo] = v1172}173//RenewLocks renews the locks on messages provided174func (l *AmqpConnection) RenewLocks(ctx context.Context, messages []*amqp.Message) error {175 lockTokens := make([]amqp.UUID, 0, len(messages))176 for _, m := range messages {177 expires, ok := m.Annotations["x-opt-locked-until"]178 if !ok {179 log.WithField("message", m).Error("failed to get x-opt-locked-until from message annotations")180 } else {181 log.WithField("locked-until", expires).Debug("message lock expires at")182 }183 if len(m.DeliveryTag) != 16 {184 return fmt.Errorf("message's deliverytag incorrect length, expected: 16 got: %v", len(m.DeliveryTag))185 }186 // Get lock token from the deliveryTag187 var lockTokenBytes [16]byte188 copy(lockTokenBytes[:], m.DeliveryTag[:16])189 // translate from .net guid byte serialisation format to amqp rfc standard190 swapIndex(0, 3, &lockTokenBytes)191 swapIndex(1, 2, &lockTokenBytes)192 swapIndex(4, 5, &lockTokenBytes)193 swapIndex(6, 7, &lockTokenBytes)194 lockTokenUUIDDeliveryTag := amqp.UUID(lockTokenBytes)195 lockTokens = append(lockTokens, lockTokenUUIDDeliveryTag)196 log.WithField("uuid", lockTokenUUIDDeliveryTag).Debug("adding lockid to renew")197 }198 if len(lockTokens) < 1 {199 return fmt.Errorf("no lock tokens present to renew")200 }201 hostname, _ := os.Hostname()202 err := l.ManagementSender.Send(ctx, &amqp.Message{203 ApplicationProperties: map[string]interface{}{204 "operation": "com.microsoft:renew-lock",205 },206 Properties: &amqp.MessageProperties{207 MessageID: uuid.NewV4().String(),208 ReplyTo: hostname + "-receiver",209 },210 Value: map[string]interface{}{211 "lock-tokens": lockTokens,212 },213 })214 if err != nil {215 log.WithError(err).Error("failed to renew locks on active messages")216 return err217 }218 response, err := l.ManagementReceiver.Receive(ctx)219 if err != nil {220 log.WithError(err).Error("error response from server when renewing locks")221 return err222 }223 if code, exists := response.ApplicationProperties["statusCode"]; exists {224 if codeInt, valid := code.(int32); valid && codeInt != 200 {225 return fmt.Errorf("lock renewal failed, response status message: %v", response.ApplicationProperties["statusDescription"])226 }227 }228 return nil229}230func (l *AmqpConnection) createAmqpSBManagementChannels(topic, eventname string) (*amqp.Sender, *amqp.Receiver, error) {231 if l.Session == nil {232 log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured")233 }234 subscriptionAddress := getSubscriptionAmqpPath(topic, eventname) + "/$management"235 hostname, _ := os.Hostname()236 hostAddress := hostname237 // hostAddress := hostname + "/" + subscriptionAddress238 // receiver := uuid.NewV4().String()239 sender, err := l.Session.NewSender(240 amqp.LinkTargetAddress(subscriptionAddress),241 amqp.LinkSourceAddress(hostAddress+"-sender"),242 )243 if err != nil {244 log.Fatal("Creating management sender:", err)245 return nil, nil, err246 }247 reciever, err := l.Session.NewReceiver(248 amqp.LinkSourceAddress(subscriptionAddress),249 amqp.LinkTargetAddress(hostAddress+"-receiver"),250 )251 if err != nil {252 log.Fatal("Creating management receiver:", err)253 return nil, nil, err254 }255 return sender, reciever, nil256}257func createAmqpListener(listener *AmqpConnection) *amqp.Receiver {258 // Todo: how do we validate that the session is healthy?259 if listener.Session == nil {260 log.WithField("currentListener", listener).Panic("Cannot create amqp listener without a session already configured")261 }262 // Create a receiver263 receiver, err := listener.Session.NewReceiver(264 amqp.LinkSourceAddress(listener.SubscriptionAmqpPath),265 // amqp.LinkCredit(10), // Todo: Add config value to define how many inflight tasks the dispatcher can handle266 )267 if err != nil {268 log.Fatal("Creating receiver:", err)269 }270 return receiver271}272// CreateAmqpSender exists for e2e testing.273func (l *AmqpConnection) CreateAmqpSender(topic string) (*amqp.Sender, error) {274 if l.Session == nil {275 log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured")276 }277 sender, err := l.Session.NewSender(278 amqp.LinkTargetAddress("/" + strings.ToLower(topic)),279 )280 if err != nil {281 log.Fatal("Creating receiver:", err)282 return nil, err283 }284 return sender, nil285}286func createTopic(ctx context.Context, topicsClient servicebus.TopicsClient, config *types.Configuration, topicName string) servicebus.SBTopic {287 topic, err := topicsClient.Get(ctx, config.ResourceGroup, config.ServiceBusNamespace, topicName)288 if err != nil && topic.Response.Response != nil && topic.Response.StatusCode == http.StatusNotFound {289 log.WithField("config", types.RedactConfigSecrets(config)).Debugf("topic %v doesn't exist.. creating", topicName)290 topic, err = topicsClient.CreateOrUpdate(ctx, config.ResourceGroup, config.ServiceBusNamespace, topicName, servicebus.SBTopic{})291 if err != nil {292 log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed creating topic: %v", err)293 }294 } else if err != nil {295 log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting topic: %v", err)296 }297 return topic298}299func createAmqpSession(listener *AmqpConnection) *amqp.Session {300 // Create client301 client, err := amqp.Dial(listener.AMQPConnectionString)302 if err != nil {303 log.Fatal("Dialing AMQP server:", err)304 }305 session, err := client.NewSession()306 if err != nil {307 log.WithError(err).Fatal("Creating session failed")308 }309 return session310}311func getAmqpConnectionString(keyName, keyValue, namespace string) string {312 encodedKey := url.QueryEscape(keyValue)313 return fmt.Sprintf("amqps://%s:%s@%s.servicebus.windows.net", keyName, encodedKey, namespace)314}315func getSubscriptionAmqpPath(eventName, moduleName string) string {316 return "/" + strings.ToLower(eventName) + "/subscriptions/" + getSubscriptionName(eventName, moduleName)317}318func getSubscriptionName(eventName, moduleName string) string {319 return strings.ToLower(eventName) + "_" + strings.ToLower(moduleName)320}...

Full Screen

Full Screen

amqp.go

Source:amqp.go Github

copy

Full Screen

...43 var e Executor44 if err := mapstructure.Decode(step, &e); err != nil {45 return nil, err46 }47 client, session, err := e.createAMQPSession(ctx)48 if err != nil {49 return nil, err50 }51 defer client.Close()52 defer session.Close(ctx)53 switch e.ClientType {54 case "producer":55 return e.publishMessages(ctx, session)56 case "consumer":57 return e.consumeMessages(ctx, session)58 default:59 return nil, fmt.Errorf("clientType %q must be producer or consumer", e.ClientType)60 }61}62func (e Executor) createAMQPSession(ctx context.Context) (*amqp.Client, *amqp.Session, error) {63 if e.Addr == "" {64 return nil, nil, errors.New("creating session: addr is mandatory")65 }66 client, err := amqp.Dial(e.Addr, amqp.ConnSASLAnonymous())67 if err != nil {68 return nil, nil, fmt.Errorf("creating session: %w", err)69 }70 session, err := client.NewSession()71 if err != nil {72 return nil, nil, fmt.Errorf("creating session: %w", err)73 }74 return client, session, nil75}76func (e Executor) publishMessages(ctx context.Context, session *amqp.Session) (interface{}, error) {...

Full Screen

Full Screen

amqp_server.go

Source:amqp_server.go Github

copy

Full Screen

...41func (s *amqpServer) Stop() error {42 return s.connection.Close()43}44func (s *amqpServer) register(route Route) error {45 session, err := s.createAMQPSession()46 if err != nil {47 return err48 }49 defer s.Application.Close(session)50 err = session.register(route)51 if err == errStoppedListening {52 return s.register(route)53 }54 return err55}...

Full Screen

Full Screen

createAMQPSession

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 hub, err := eventhub.NewHubFromConnectionString("")4 if err != nil {5 log.Fatal(err)6 }7 defer hub.Close()8 session, err := hub.CreateAMQPSession()9 if err != nil {10 log.Fatal(err)11 }12 defer session.Close(nil)13 sender, err := session.NewSender(14 eventhub.NewEventHubPartitionSenderOptions("0"),15 if err != nil {16 log.Fatal(err)17 }18 defer sender.Close(nil)19 receiver, err := session.NewReceiver(20 eventhub.NewEventHubPartitionReceiverOptions("0"),21 if err != nil {22 log.Fatal(err)23 }24 defer receiver.Close(nil)25 event := eventhub.NewEvent([]byte("Hello, World!"))26 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)27 defer cancel()28 if err := sender.Send(ctx, event); err != nil {29 log.Fatal(err)30 }31 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)32 defer cancel()33 if err := receiver.Receive(ctx, func(ctx context.Context, event *eventhub.Event) error {34 fmt.Println(string(event.Data))35 }); err != nil {36 log.Fatal(err)37 }38}39import (40func main()

Full Screen

Full Screen

createAMQPSession

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 fmt.Println(err)5 }6 q, err := ns.NewQueue("testqueue")7 if err != nil {8 fmt.Println(err)9 }10 session, err := q.NewAMQPSession()11 if err != nil {12 fmt.Println(err)13 }14 fmt.Println("Successfully created a new AMQP session")15}16import (17func main() {18 if err != nil {19 fmt.Println(err)20 }21 q, err := ns.NewQueue("testqueue")22 if err != nil {23 fmt.Println(err)24 }25 session, err := q.NewAMQPSession()26 if err != nil {27 fmt.Println(err)28 }29 receiver, err := session.NewReceiver()30 if err != nil {31 fmt.Println(err)32 }33 sender, err := session.NewSender()34 if err != nil {35 fmt.Println(err)36 }37 err = sender.Send(context.Background(), &servicebus.Message{38 Body: []byte("Hello World!"),39 })40 if err != nil {41 fmt.Println(err)42 }

Full Screen

Full Screen

createAMQPSession

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 amqpInstance := NewAMQP()4 session, err := amqpInstance.createAMQPSession()5 if err != nil {6 log.Fatal(err)7 }8 if err != nil {9 log.Fatal(err)10 }11 if err != nil {12 log.Fatal(err)13 }14 msg := amqp.NewMessage([]byte("Hello World!"))15 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)16 defer cancel()17 err = sender.Send(ctx, msg)18 if err != nil {19 log.Fatal(err)20 }21 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)22 defer cancel()23 msg, err = receiver.Receive(ctx)24 if err != nil {25 log.Fatal(err)26 }27 fmt.Printf("%s", msg.GetData())28}29import (30func main() {31 amqpInstance := NewAMQP()32 client, err := amqpInstance.createAMQPClient()33 if err != nil {34 log.Fatal(err)35 }36 session, err := client.NewSession()37 if err != nil {38 log.Fatal(err)39 }40 if err != nil {41 log.Fatal(err)42 }43 if err != nil {44 log.Fatal(err)45 }46 msg := amqp.NewMessage([]byte("Hello World!"))47 ctx, cancel := context.WithTimeout(context.Background(), 10

Full Screen

Full Screen

createAMQPSession

Using AI Code Generation

copy

Full Screen

1func main() {2 amqp := amqp.NewAMQP()3}4func main() {5 amqp := amqp.NewAMQP()6}7func main() {8 amqp := amqp.NewAMQP()9}10func main() {11 amqp := amqp.NewAMQP()12}13func main() {14 amqp := amqp.NewAMQP()15}16func main() {17 amqp := amqp.NewAMQP()18}19func main() {20 amqp := amqp.NewAMQP()21}22func main() {23 amqp := amqp.NewAMQP()24}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Venom automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful