Best Venom code snippet using amqp.createAMQPSession
amqpconnection.go
Source:amqpconnection.go  
1package servicebus2import (3	"context"4	"fmt"5	"github.com/twinj/uuid"6	"net/http"7	"net/url"8	"os"9	"strings"10	log "github.com/sirupsen/logrus"11	"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"12	"github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2017-04-01/servicebus"13	"github.com/Azure/go-autorest/autorest/azure"14	"github.com/Azure/go-autorest/autorest/to"15	"github.com/lawrencegripper/ion/internal/app/dispatcher/helpers"16	"github.com/lawrencegripper/ion/internal/pkg/types"17	"pack.ag/amqp"18)19const serviceBusRootKeyName = "RootManageSharedAccessKey"20// AmqpConnection provides a connection to service bus and methods for creating required subscriptions and topics21type AmqpConnection struct {22	subsClient           *servicebus.SubscriptionsClient23	Endpoint             string24	SubscriptionName     string25	SubscriptionAmqpPath string26	TopicName            string27	AccessKeys           servicebus.AccessKeys28	AMQPConnectionString string29	Session              *amqp.Session30	Receiver             *amqp.Receiver31	ManagementReceiver   *amqp.Receiver32	ManagementSender     *amqp.Sender33	getSubscription      func() (servicebus.SBSubscription, error)34}35// MessageCountDetails is a mirror of the SB SDK object but without pointers and things we don't need so logrus can36// log the numbers correctly37type MessageCountDetails struct {38	// ActiveMessageCount - Number of active messages in the queue, topic, or subscription.39	ActiveMessageCount int6440	// DeadLetterMessageCount - Number of messages that are dead lettered.41	DeadLetterMessageCount int6442}43// GetQueueDepth returns the current length of the sb queue44func (l *AmqpConnection) GetQueueDepth() (MessageCountDetails, error) {45	sub, err := l.getSubscription()46	if err != nil || sub.MessageCount == nil {47		return MessageCountDetails{}, err48	}49	details := sub.CountDetails50	detailsPointerless := MessageCountDetails{}51	if details.ActiveMessageCount != nil {52		detailsPointerless.ActiveMessageCount = *details.ActiveMessageCount53	}54	if details.DeadLetterMessageCount != nil {55		detailsPointerless.DeadLetterMessageCount = *details.DeadLetterMessageCount56	}57	return detailsPointerless, nil58}59// Todo: Reconsider approach to error handling in this code.60// Move to returning err and panicing in the caller if listener creation fails.61// NewAmqpConnection initilises a servicebus lister from configuration62func NewAmqpConnection(ctx context.Context, config *types.Configuration) *AmqpConnection {63	if config == nil {64		log.Panic("Nil config not allowed")65	}66	if config.SubscribesToEvent == "" {67		log.Panic("Empty subscribesToEvent not allowed")68	}69	if config.ModuleName == "" {70		log.Panic("Empty module name not allowed")71	}72	if config.Job == nil {73		log.Panic("Job config required")74	}75	//Todo: close connection to amqp when context is cancelled/done76	listener := AmqpConnection{}77	auth := helpers.GetAzureADAuthorizer(config, azure.PublicCloud.ResourceManagerEndpoint)78	subsClient := servicebus.NewSubscriptionsClient(config.SubscriptionID)79	subsClient.Authorizer = auth80	topicsClient := servicebus.NewTopicsClient(config.SubscriptionID)81	topicsClient.Authorizer = auth82	namespaceClient := servicebus.NewNamespacesClient(config.SubscriptionID)83	namespaceClient.Authorizer = auth84	groupsClient := resources.NewGroupsClient(config.SubscriptionID)85	groupsClient.Authorizer = auth86	listener.subsClient = &subsClient87	// Check if resource group exists88	_, err := groupsClient.Get(ctx, config.ResourceGroup)89	if err != nil {90		log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting resource group: %v", err)91	}92	// Check namespace exists93	namespace, err := namespaceClient.Get(ctx, config.ResourceGroup, config.ServiceBusNamespace)94	if err != nil {95		log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting servicebus namespace: %v", err)96	}97	listener.Endpoint = *namespace.ServiceBusEndpoint98	keys, err := namespaceClient.ListKeys(ctx, config.ResourceGroup, config.ServiceBusNamespace, serviceBusRootKeyName)99	if err != nil {100		log.WithFields(log.Fields{101			"config":   types.RedactConfigSecrets(config),102			"response": keys,103		}).WithError(err).Panicf("Failed getting servicebus namespace")104	}105	listener.AccessKeys = keys106	listener.AMQPConnectionString = getAmqpConnectionString(*keys.KeyName, *keys.SecondaryKey, *namespace.Name)107	// Check Topic to listen on. Create a topic if missing108	topic := createTopic(ctx, topicsClient, config, config.SubscribesToEvent)109	listener.TopicName = strings.ToLower(*topic.Name)110	eventsPublished := strings.Split(config.EventsPublished, ",")111	for _, eventName := range eventsPublished {112		// Check topic to publish to. Create is missing113		createTopic(ctx, topicsClient, config, eventName)114	}115	// Check subscription to listen on. Create if missing116	subName := getSubscriptionName(config.SubscribesToEvent, config.ModuleName)117	sub, err := subsClient.Get(118		ctx,119		config.ResourceGroup,120		config.ServiceBusNamespace,121		config.SubscribesToEvent,122		subName,123	)124	listener.getSubscription = func() (servicebus.SBSubscription, error) {125		return subsClient.Get(126			ctx,127			config.ResourceGroup,128			config.ServiceBusNamespace,129			config.SubscribesToEvent,130			subName,131		)132	}133	if err != nil && sub.Response.StatusCode == http.StatusNotFound {134		log.WithField("config", types.RedactConfigSecrets(config)).Debugf("subscription %v doesn't exist.. creating", subName)135		deliveryCount := config.Job.RetryCount + 1136		if deliveryCount < 1 {137			log.Error("retryCount must be greater than or equal to 0")138		}139		subDef := servicebus.SBSubscription{140			SBSubscriptionProperties: &servicebus.SBSubscriptionProperties{141				MaxDeliveryCount: to.Int32Ptr(int32(deliveryCount)),142			},143		}144		sub, err = subsClient.CreateOrUpdate(145			ctx,146			config.ResourceGroup,147			config.ServiceBusNamespace,148			config.SubscribesToEvent,149			subName,150			subDef,151		)152		if err != nil {153			log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed creating subscription: %v", err)154		}155	} else if err != nil {156		log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting subscription: %v", err)157	}158	listener.SubscriptionName = *sub.Name159	listener.SubscriptionAmqpPath = getSubscriptionAmqpPath(config.SubscribesToEvent, config.ModuleName)160	listener.Session = createAmqpSession(&listener)161	listener.Receiver = createAmqpListener(&listener)162	listener.ManagementSender, listener.ManagementReceiver, err = listener.createAmqpSBManagementChannels(listener.TopicName, config.ModuleName)163	if err != nil {164		log.WithError(err).Error("failed to create management sender, without this renewal of message locks will fail")165	}166	return &listener167}168func swapIndex(indexOne, indexTwo int, array *[16]byte) {169	v1 := array[indexOne]170	array[indexOne] = array[indexTwo]171	array[indexTwo] = v1172}173//RenewLocks renews the locks on messages provided174func (l *AmqpConnection) RenewLocks(ctx context.Context, messages []*amqp.Message) error {175	lockTokens := make([]amqp.UUID, 0, len(messages))176	for _, m := range messages {177		expires, ok := m.Annotations["x-opt-locked-until"]178		if !ok {179			log.WithField("message", m).Error("failed to get x-opt-locked-until from message annotations")180		} else {181			log.WithField("locked-until", expires).Debug("message lock expires at")182		}183		if len(m.DeliveryTag) != 16 {184			return fmt.Errorf("message's deliverytag incorrect length, expected: 16 got: %v", len(m.DeliveryTag))185		}186		// Get lock token from the deliveryTag187		var lockTokenBytes [16]byte188		copy(lockTokenBytes[:], m.DeliveryTag[:16])189		// translate from .net guid byte serialisation format to amqp rfc standard190		swapIndex(0, 3, &lockTokenBytes)191		swapIndex(1, 2, &lockTokenBytes)192		swapIndex(4, 5, &lockTokenBytes)193		swapIndex(6, 7, &lockTokenBytes)194		lockTokenUUIDDeliveryTag := amqp.UUID(lockTokenBytes)195		lockTokens = append(lockTokens, lockTokenUUIDDeliveryTag)196		log.WithField("uuid", lockTokenUUIDDeliveryTag).Debug("adding lockid to renew")197	}198	if len(lockTokens) < 1 {199		return fmt.Errorf("no lock tokens present to renew")200	}201	hostname, _ := os.Hostname()202	err := l.ManagementSender.Send(ctx, &amqp.Message{203		ApplicationProperties: map[string]interface{}{204			"operation": "com.microsoft:renew-lock",205		},206		Properties: &amqp.MessageProperties{207			MessageID: uuid.NewV4().String(),208			ReplyTo:   hostname + "-receiver",209		},210		Value: map[string]interface{}{211			"lock-tokens": lockTokens,212		},213	})214	if err != nil {215		log.WithError(err).Error("failed to renew locks on active messages")216		return err217	}218	response, err := l.ManagementReceiver.Receive(ctx)219	if err != nil {220		log.WithError(err).Error("error response from server when renewing locks")221		return err222	}223	if code, exists := response.ApplicationProperties["statusCode"]; exists {224		if codeInt, valid := code.(int32); valid && codeInt != 200 {225			return fmt.Errorf("lock renewal failed, response status message: %v", response.ApplicationProperties["statusDescription"])226		}227	}228	return nil229}230func (l *AmqpConnection) createAmqpSBManagementChannels(topic, eventname string) (*amqp.Sender, *amqp.Receiver, error) {231	if l.Session == nil {232		log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured")233	}234	subscriptionAddress := getSubscriptionAmqpPath(topic, eventname) + "/$management"235	hostname, _ := os.Hostname()236	hostAddress := hostname237	// hostAddress := hostname + "/" + subscriptionAddress238	// receiver := uuid.NewV4().String()239	sender, err := l.Session.NewSender(240		amqp.LinkTargetAddress(subscriptionAddress),241		amqp.LinkSourceAddress(hostAddress+"-sender"),242	)243	if err != nil {244		log.Fatal("Creating management sender:", err)245		return nil, nil, err246	}247	reciever, err := l.Session.NewReceiver(248		amqp.LinkSourceAddress(subscriptionAddress),249		amqp.LinkTargetAddress(hostAddress+"-receiver"),250	)251	if err != nil {252		log.Fatal("Creating management receiver:", err)253		return nil, nil, err254	}255	return sender, reciever, nil256}257func createAmqpListener(listener *AmqpConnection) *amqp.Receiver {258	// Todo: how do we validate that the session is healthy?259	if listener.Session == nil {260		log.WithField("currentListener", listener).Panic("Cannot create amqp listener without a session already configured")261	}262	// Create a receiver263	receiver, err := listener.Session.NewReceiver(264		amqp.LinkSourceAddress(listener.SubscriptionAmqpPath),265		// amqp.LinkCredit(10), // Todo: Add config value to define how many inflight tasks the dispatcher can handle266	)267	if err != nil {268		log.Fatal("Creating receiver:", err)269	}270	return receiver271}272// CreateAmqpSender exists for e2e testing.273func (l *AmqpConnection) CreateAmqpSender(topic string) (*amqp.Sender, error) {274	if l.Session == nil {275		log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured")276	}277	sender, err := l.Session.NewSender(278		amqp.LinkTargetAddress("/" + strings.ToLower(topic)),279	)280	if err != nil {281		log.Fatal("Creating receiver:", err)282		return nil, err283	}284	return sender, nil285}286func createTopic(ctx context.Context, topicsClient servicebus.TopicsClient, config *types.Configuration, topicName string) servicebus.SBTopic {287	topic, err := topicsClient.Get(ctx, config.ResourceGroup, config.ServiceBusNamespace, topicName)288	if err != nil && topic.Response.Response != nil && topic.Response.StatusCode == http.StatusNotFound {289		log.WithField("config", types.RedactConfigSecrets(config)).Debugf("topic %v doesn't exist.. creating", topicName)290		topic, err = topicsClient.CreateOrUpdate(ctx, config.ResourceGroup, config.ServiceBusNamespace, topicName, servicebus.SBTopic{})291		if err != nil {292			log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed creating topic: %v", err)293		}294	} else if err != nil {295		log.WithField("config", types.RedactConfigSecrets(config)).Panicf("Failed getting topic: %v", err)296	}297	return topic298}299func createAmqpSession(listener *AmqpConnection) *amqp.Session {300	// Create client301	client, err := amqp.Dial(listener.AMQPConnectionString)302	if err != nil {303		log.Fatal("Dialing AMQP server:", err)304	}305	session, err := client.NewSession()306	if err != nil {307		log.WithError(err).Fatal("Creating session failed")308	}309	return session310}311func getAmqpConnectionString(keyName, keyValue, namespace string) string {312	encodedKey := url.QueryEscape(keyValue)313	return fmt.Sprintf("amqps://%s:%s@%s.servicebus.windows.net", keyName, encodedKey, namespace)314}315func getSubscriptionAmqpPath(eventName, moduleName string) string {316	return "/" + strings.ToLower(eventName) + "/subscriptions/" + getSubscriptionName(eventName, moduleName)317}318func getSubscriptionName(eventName, moduleName string) string {319	return strings.ToLower(eventName) + "_" + strings.ToLower(moduleName)320}...amqp.go
Source:amqp.go  
...43	var e Executor44	if err := mapstructure.Decode(step, &e); err != nil {45		return nil, err46	}47	client, session, err := e.createAMQPSession(ctx)48	if err != nil {49		return nil, err50	}51	defer client.Close()52	defer session.Close(ctx)53	switch e.ClientType {54	case "producer":55		return e.publishMessages(ctx, session)56	case "consumer":57		return e.consumeMessages(ctx, session)58	default:59		return nil, fmt.Errorf("clientType %q must be producer or consumer", e.ClientType)60	}61}62func (e Executor) createAMQPSession(ctx context.Context) (*amqp.Client, *amqp.Session, error) {63	if e.Addr == "" {64		return nil, nil, errors.New("creating session: addr is mandatory")65	}66	client, err := amqp.Dial(e.Addr, amqp.ConnSASLAnonymous())67	if err != nil {68		return nil, nil, fmt.Errorf("creating session: %w", err)69	}70	session, err := client.NewSession()71	if err != nil {72		return nil, nil, fmt.Errorf("creating session: %w", err)73	}74	return client, session, nil75}76func (e Executor) publishMessages(ctx context.Context, session *amqp.Session) (interface{}, error) {...amqp_server.go
Source:amqp_server.go  
...41func (s *amqpServer) Stop() error {42	return s.connection.Close()43}44func (s *amqpServer) register(route Route) error {45	session, err := s.createAMQPSession()46	if err != nil {47		return err48	}49	defer s.Application.Close(session)50	err = session.register(route)51	if err == errStoppedListening {52		return s.register(route)53	}54	return err55}...createAMQPSession
Using AI Code Generation
1import (2func main() {3	hub, err := eventhub.NewHubFromConnectionString("")4	if err != nil {5		log.Fatal(err)6	}7	defer hub.Close()8	session, err := hub.CreateAMQPSession()9	if err != nil {10		log.Fatal(err)11	}12	defer session.Close(nil)13	sender, err := session.NewSender(14		eventhub.NewEventHubPartitionSenderOptions("0"),15	if err != nil {16		log.Fatal(err)17	}18	defer sender.Close(nil)19	receiver, err := session.NewReceiver(20		eventhub.NewEventHubPartitionReceiverOptions("0"),21	if err != nil {22		log.Fatal(err)23	}24	defer receiver.Close(nil)25	event := eventhub.NewEvent([]byte("Hello, World!"))26	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)27	defer cancel()28	if err := sender.Send(ctx, event); err != nil {29		log.Fatal(err)30	}31	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)32	defer cancel()33	if err := receiver.Receive(ctx, func(ctx context.Context, event *eventhub.Event) error {34		fmt.Println(string(event.Data))35	}); err != nil {36		log.Fatal(err)37	}38}39import (40func main()createAMQPSession
Using AI Code Generation
1import (2func main() {3	if err != nil {4		fmt.Println(err)5	}6	q, err := ns.NewQueue("testqueue")7	if err != nil {8		fmt.Println(err)9	}10	session, err := q.NewAMQPSession()11	if err != nil {12		fmt.Println(err)13	}14	fmt.Println("Successfully created a new AMQP session")15}16import (17func main() {18	if err != nil {19		fmt.Println(err)20	}21	q, err := ns.NewQueue("testqueue")22	if err != nil {23		fmt.Println(err)24	}25	session, err := q.NewAMQPSession()26	if err != nil {27		fmt.Println(err)28	}29	receiver, err := session.NewReceiver()30	if err != nil {31		fmt.Println(err)32	}33	sender, err := session.NewSender()34	if err != nil {35		fmt.Println(err)36	}37	err = sender.Send(context.Background(), &servicebus.Message{38		Body: []byte("Hello World!"),39	})40	if err != nil {41		fmt.Println(err)42	}createAMQPSession
Using AI Code Generation
1import (2func main() {3	amqpInstance := NewAMQP()4	session, err := amqpInstance.createAMQPSession()5	if err != nil {6		log.Fatal(err)7	}8	if err != nil {9		log.Fatal(err)10	}11	if err != nil {12		log.Fatal(err)13	}14	msg := amqp.NewMessage([]byte("Hello World!"))15	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)16	defer cancel()17	err = sender.Send(ctx, msg)18	if err != nil {19		log.Fatal(err)20	}21	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)22	defer cancel()23	msg, err = receiver.Receive(ctx)24	if err != nil {25		log.Fatal(err)26	}27	fmt.Printf("%s", msg.GetData())28}29import (30func main() {31	amqpInstance := NewAMQP()32	client, err := amqpInstance.createAMQPClient()33	if err != nil {34		log.Fatal(err)35	}36	session, err := client.NewSession()37	if err != nil {38		log.Fatal(err)39	}40	if err != nil {41		log.Fatal(err)42	}43	if err != nil {44		log.Fatal(err)45	}46	msg := amqp.NewMessage([]byte("Hello World!"))47	ctx, cancel := context.WithTimeout(context.Background(), 10createAMQPSession
Using AI Code Generation
1func main() {2    amqp := amqp.NewAMQP()3}4func main() {5    amqp := amqp.NewAMQP()6}7func main() {8    amqp := amqp.NewAMQP()9}10func main() {11    amqp := amqp.NewAMQP()12}13func main() {14    amqp := amqp.NewAMQP()15}16func main() {17    amqp := amqp.NewAMQP()18}19func main() {20    amqp := amqp.NewAMQP()21}22func main() {23    amqp := amqp.NewAMQP()24}Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
