How to use Run method of amqp Package

Best Venom code snippet using amqp.Run

dialer_test.go

Source:dialer_test.go Github

copy

Full Screen

...39 dialer.Close()40 // Output:41}42func TestOptions(main *testing.T) {43 main.Run("NoURL", func(t *testing.T) {44 defer goleak.VerifyNone(t)45 ctrl := gomock.NewController(t)46 defer ctrl.Finish()47 _, err := amqpextra.NewDialer()48 require.EqualError(t, err, "url(s) must be set")49 })50 main.Run("ZeroRetryPeriod", func(t *testing.T) {51 defer goleak.VerifyNone(t)52 ctrl := gomock.NewController(t)53 defer ctrl.Finish()54 _, err := amqpextra.NewDialer(55 amqpextra.WithURL("URL"),56 amqpextra.WithRetryPeriod(time.Duration(0)),57 )58 require.EqualError(t, err, "retryPeriod must be greater then zero")59 })60 main.Run("NegativeRetryPeriod", func(t *testing.T) {61 defer goleak.VerifyNone(t)62 ctrl := gomock.NewController(t)63 defer ctrl.Finish()64 _, err := amqpextra.NewDialer(65 amqpextra.WithURL("URL"),66 amqpextra.WithRetryPeriod(time.Duration(-1)),67 )68 require.EqualError(t, err, "retryPeriod must be greater then zero")69 })70 main.Run("ErrorEmptyURl", func(t *testing.T) {71 defer goleak.VerifyNone(t)72 ctrl := gomock.NewController(t)73 defer ctrl.Finish()74 _, err := amqpextra.NewDialer(75 amqpextra.WithURL("", "url"),76 )77 require.EqualError(t, err, "url(s) must be not empty")78 })79 main.Run("NoTLSConfig", func(t *testing.T) {80 defer goleak.VerifyNone(t)81 ctrl := gomock.NewController(t)82 defer ctrl.Finish()83 dialer, err := amqpextra.NewDialer(84 amqpextra.WithURL("url"),85 )86 require.Equal(t, nil, err)87 dialer.Close()88 })89 main.Run("WithTLSConfig", func(t *testing.T) {90 defer goleak.VerifyNone(t)91 ctrl := gomock.NewController(t)92 defer ctrl.Finish()93 dialer, err := amqpextra.NewDialer(94 amqpextra.WithURL("url"),95 amqpextra.WithTLS(&tls.Config{MinVersion: tls.VersionTLS13}),96 )97 assert.Equal(t, nil, err)98 dialer.Close()99 })100}101func TestConnectState(main *testing.T) {102 main.Run("CloseWhileDialingErrored", func(t *testing.T) {103 defer goleak.VerifyNone(t)104 l := logger.NewTest()105 stateCh := make(chan amqpextra.State, 2)106 dialer, err := amqpextra.NewDialer(107 amqpextra.WithURL("amqp://rabbitmq.host"),108 amqpextra.WithAMQPDial(amqpDialStub(time.Millisecond*50, fmt.Errorf("dialing errored"))),109 amqpextra.WithLogger(l),110 amqpextra.WithNotify(stateCh),111 )112 require.NoError(t, err)113 time.Sleep(time.Millisecond * 10)114 assertUnready(t, stateCh, "dialing errored")115 dialer.Close()116 assertClosed(t, dialer)117 assert.Equal(t, `[DEBUG] connection unready118[DEBUG] dialing119[DEBUG] connection unready: dialing errored120[DEBUG] connection closed121`, l.Logs())122 })123 main.Run("CloseWhileDialing", func(t *testing.T) {124 defer goleak.VerifyNone(t)125 ctrl := gomock.NewController(t)126 defer ctrl.Finish()127 l := logger.NewTest()128 closeCh := make(chan *amqp.Error)129 stateCh := make(chan amqpextra.State, 2)130 conn := mock_amqpextra.NewMockAMQPConnection(ctrl)131 conn.EXPECT().Close().Return(nil)132 conn.EXPECT().NotifyClose(any()).Return(closeCh)133 dialer, err := amqpextra.NewDialer(134 amqpextra.WithURL("amqp://rabbitmq.host"),135 amqpextra.WithAMQPDial(amqpDialStub(time.Millisecond*150, conn)),136 amqpextra.WithLogger(l),137 amqpextra.WithNotify(stateCh),138 )139 require.NoError(t, err)140 time.Sleep(time.Millisecond * 100)141 assertReady(t, stateCh)142 dialer.Close()143 assertClosed(t, dialer)144 assert.Equal(t, `[DEBUG] connection unready145[DEBUG] dialing146[DEBUG] connection ready147[DEBUG] connection closed148`, l.Logs())149 })150 main.Run("CloseByContextWhileDialing", func(t *testing.T) {151 defer goleak.VerifyNone(t)152 ctrl := gomock.NewController(t)153 defer ctrl.Finish()154 l := logger.NewTest()155 closeCh := make(chan *amqp.Error)156 stateCh := make(chan amqpextra.State, 2)157 conn := mock_amqpextra.NewMockAMQPConnection(ctrl)158 conn.EXPECT().Close().Return(nil)159 conn.EXPECT().NotifyClose(any()).Return(closeCh)160 ctx, cancelFunc := context.WithCancel(context.Background())161 defer cancelFunc()162 dialer, err := amqpextra.NewDialer(163 amqpextra.WithURL("amqp://rabbitmq.host"),164 amqpextra.WithAMQPDial(amqpDialStub(time.Millisecond*150, conn)),165 amqpextra.WithLogger(l),166 amqpextra.WithContext(ctx),167 amqpextra.WithNotify(stateCh),168 )169 require.NoError(t, err)170 time.Sleep(time.Millisecond * 100)171 assertReady(t, stateCh)172 cancelFunc()173 assertClosed(t, dialer)174 assert.Equal(t, `[DEBUG] connection unready175[DEBUG] dialing176[DEBUG] connection ready177[DEBUG] connection closed178`, l.Logs())179 })180 main.Run("NoConnWhileDialing", func(t *testing.T) {181 defer goleak.VerifyNone(t)182 ctrl := gomock.NewController(t)183 defer ctrl.Finish()184 l := logger.NewTest()185 closeCh := make(chan *amqp.Error)186 stateCh := make(chan amqpextra.State, 2)187 amqpConn := mock_amqpextra.NewMockAMQPConnection(ctrl)188 amqpConn.EXPECT().Close().Return(nil)189 amqpConn.EXPECT().NotifyClose(any()).Return(closeCh)190 dialer, err := amqpextra.NewDialer(191 amqpextra.WithURL("amqp://rabbitmq.host"),192 amqpextra.WithAMQPDial(amqpDialStub(time.Millisecond*150, amqpConn)),193 amqpextra.WithLogger(l),194 amqpextra.WithNotify(stateCh),195 )196 require.NoError(t, err)197 assertNoConn(t, dialer.ConnectionCh())198 assertReady(t, stateCh)199 dialer.Close()200 assertClosed(t, dialer)201 assert.Equal(t, `[DEBUG] connection unready202[DEBUG] dialing203[DEBUG] connection ready204[DEBUG] connection closed205`, l.Logs())206 })207 main.Run("CloseWhileWaitRetry", func(t *testing.T) {208 defer goleak.VerifyNone(t)209 ctrl := gomock.NewController(t)210 defer ctrl.Finish()211 l := logger.NewTest()212 stateCh := make(chan amqpextra.State, 2)213 dialer, err := amqpextra.NewDialer(214 amqpextra.WithURL("amqp://rabbitmq.host"),215 amqpextra.WithNotify(stateCh),216 amqpextra.WithRetryPeriod(time.Millisecond*150),217 amqpextra.WithAMQPDial(amqpDialStub(fmt.Errorf("the error"))),218 amqpextra.WithLogger(l),219 )220 require.NoError(t, err)221 time.Sleep(time.Millisecond * 100)222 assertUnready(t, stateCh, "the error")223 dialer.Close()224 assertClosed(t, dialer)225 assert.Equal(t, `[DEBUG] connection unready226[DEBUG] dialing227[DEBUG] connection unready: the error228[DEBUG] connection closed229`, l.Logs())230 })231 main.Run("CloseByContextWhileWaitRetry", func(t *testing.T) {232 defer goleak.VerifyNone(t)233 ctrl := gomock.NewController(t)234 defer ctrl.Finish()235 l := logger.NewTest()236 stateCh := make(chan amqpextra.State, 2)237 ctx, cancelFunc := context.WithCancel(context.Background())238 defer cancelFunc()239 dialer, err := amqpextra.NewDialer(240 amqpextra.WithURL("amqp://rabbitmq.host"),241 amqpextra.WithAMQPDial(amqpDialStub(fmt.Errorf("the error"))),242 amqpextra.WithRetryPeriod(time.Millisecond*150),243 amqpextra.WithNotify(stateCh),244 amqpextra.WithLogger(l),245 amqpextra.WithContext(ctx),246 )247 require.NoError(t, err)248 time.Sleep(time.Millisecond * 100)249 assertUnready(t, stateCh, "the error")250 cancelFunc()251 assertClosed(t, dialer)252 assert.Equal(t, `[DEBUG] connection unready253[DEBUG] dialing254[DEBUG] connection unready: the error255[DEBUG] connection closed256`, l.Logs())257 })258 main.Run("NoConnWhileWaitRetry", func(t *testing.T) {259 defer goleak.VerifyNone(t)260 ctrl := gomock.NewController(t)261 defer ctrl.Finish()262 l := logger.NewTest()263 stateCh := make(chan amqpextra.State, 2)264 dialer, err := amqpextra.NewDialer(265 amqpextra.WithURL("amqp://rabbitmq.host"),266 amqpextra.WithAMQPDial(amqpDialStub(fmt.Errorf("the error"))),267 amqpextra.WithRetryPeriod(time.Millisecond*150),268 amqpextra.WithNotify(stateCh),269 amqpextra.WithLogger(l),270 )271 require.NoError(t, err)272 assertUnready(t, stateCh, "the error")273 assertNoConn(t, dialer.ConnectionCh())274 dialer.Close()275 assertClosed(t, dialer)276 assert.Equal(t, `[DEBUG] connection unready277[DEBUG] dialing278[DEBUG] connection unready: the error279[DEBUG] connection closed280`, l.Logs())281 })282 main.Run("ReadyAfterWaitRetry", func(t *testing.T) {283 defer goleak.VerifyNone(t)284 ctrl := gomock.NewController(t)285 defer ctrl.Finish()286 l := logger.NewTest()287 closeCh := make(chan *amqp.Error)288 stateCh := make(chan amqpextra.State, 2)289 amqpConn := mock_amqpextra.NewMockAMQPConnection(ctrl)290 amqpConn.EXPECT().Close().Return(nil)291 amqpConn.EXPECT().NotifyClose(any()).Return(closeCh)292 dialer, err := amqpextra.NewDialer(293 amqpextra.WithURL("amqp://rabbitmq.host"),294 amqpextra.WithNotify(stateCh),295 amqpextra.WithAMQPDial(amqpDialStub(fmt.Errorf("the error"), amqpConn)),296 amqpextra.WithRetryPeriod(time.Millisecond*100),297 amqpextra.WithLogger(l),298 )299 require.NoError(t, err)300 defer dialer.Close()301 assertNoConn(t, dialer.ConnectionCh())302 assertUnready(t, stateCh, "the error")303 assertReady(t, stateCh)304 conn := <-dialer.ConnectionCh()305 assertConnNotLost(t, conn)306 dialer.Close()307 assertClosed(t, dialer)308 assert.Equal(t, `[DEBUG] connection unready309[DEBUG] dialing310[DEBUG] connection unready: the error311[DEBUG] dialing312[DEBUG] connection ready313[DEBUG] connection closed314`, l.Logs())315 })316 main.Run("GetConnectionTimeout", func(t *testing.T) {317 defer goleak.VerifyNone(t)318 ctrl := gomock.NewController(t)319 defer ctrl.Finish()320 l := logger.NewTest()321 dialer, err := amqpextra.NewDialer(322 amqpextra.WithURL("amqp://rabbitmq.host"),323 amqpextra.WithAMQPDial(amqpDialStub(time.Millisecond*75, fmt.Errorf("the error"))),324 amqpextra.WithLogger(l),325 )326 require.NoError(t, err)327 ctx, cancelFunc := context.WithTimeout(context.Background(), time.Millisecond*50)328 defer cancelFunc()329 _, err = dialer.Connection(ctx)330 require.EqualError(t, err, "context deadline exceeded")331 dialer.Close()332 assertClosed(t, dialer)333 })334 main.Run("GetConnectionDialerClosed", func(t *testing.T) {335 defer goleak.VerifyNone(t)336 ctrl := gomock.NewController(t)337 defer ctrl.Finish()338 l := logger.NewTest()339 dialer, err := amqpextra.NewDialer(340 amqpextra.WithURL("amqp://rabbitmq.host"),341 amqpextra.WithAMQPDial(amqpDialStub(time.Millisecond*75, fmt.Errorf("the error"))),342 amqpextra.WithLogger(l),343 )344 require.NoError(t, err)345 dialer.Close()346 assertClosed(t, dialer)347 _, err = dialer.Connection(context.Background())348 require.EqualError(t, err, "connection closed")349 })350 main.Run("UrlOrderInDial", func(t *testing.T) {351 defer goleak.VerifyNone(t)352 ctrl := gomock.NewController(t)353 defer ctrl.Finish()354 urls := []string{355 "the.first.url",356 "the.second.url",357 "the.last.url",358 }359 index := 0360 dialSub := func(url string, config amqp.Config) (amqpextra.AMQPConnection, error) {361 require.Equal(t, url,362 []string{363 "the.first.url",364 "the.second.url",365 "the.last.url",366 "the.first.url",367 }[index])368 index++369 return nil, errors.New("the error")370 }371 d, err := amqpextra.NewDialer(372 amqpextra.WithRetryPeriod(time.Millisecond*20),373 amqpextra.WithURL(urls...),374 amqpextra.WithAMQPDial(dialSub),375 )376 require.NoError(t, err)377 time.Sleep(time.Millisecond * 70)378 d.Close()379 })380}381func TestNotify(main *testing.T) {382 main.Run("PanicIfStateChUnbuffered", func(t *testing.T) {383 defer goleak.VerifyNone(t)384 ctrl := gomock.NewController(t)385 defer ctrl.Finish()386 stateCh := make(chan amqpextra.State)387 amqpConn := mock_amqpextra.NewMockAMQPConnection(ctrl)388 require.PanicsWithValue(t, "state chan is unbuffered", func() {389 d, _ := amqpextra.NewDialer(390 amqpextra.WithAMQPDial(391 amqpDialStub(amqpConn),392 ),393 )394 d.Notify(stateCh)395 })396 })397 main.Run("NoChangeStateWhileDial", func(t *testing.T) {398 defer goleak.VerifyNone(t)399 stateCh := make(chan amqpextra.State, 1)400 l := logger.NewTest()401 d, err := amqpextra.NewDialer(402 amqpextra.WithURL("amqp://rabbitmq.host"),403 amqpextra.WithLogger(l),404 amqpextra.WithAMQPDial(405 amqpDialStub(time.Millisecond*100, fmt.Errorf("the error")),406 ),407 )408 require.NoError(t, err)409 newStateCh := d.Notify(make(chan amqpextra.State, 1))410 assertUnready(t, newStateCh, amqp.ErrClosed.Error())411 assertNoStateChanged(t, stateCh)412 time.Sleep(time.Millisecond * 100)413 assertUnready(t, newStateCh, "the error")414 d.Close()415 assertClosed(t, d)416 expected := `[DEBUG] connection unready417[DEBUG] dialing418[DEBUG] connection unready: the error419[DEBUG] connection closed420`421 require.Equal(t, expected, l.Logs())422 })423 main.Run("ReadyIfConnected", func(t *testing.T) {424 defer goleak.VerifyNone(t)425 ctrl := gomock.NewController(t)426 defer ctrl.Finish()427 l := logger.NewTest()428 stateCh := make(chan amqpextra.State, 2)429 conn := mock_amqpextra.NewMockAMQPConnection(ctrl)430 conn.EXPECT().NotifyClose(any()).AnyTimes()431 conn.EXPECT().Close().AnyTimes()432 d, err := amqpextra.NewDialer(433 amqpextra.WithURL("amqp://rabbitmq.host"),434 amqpextra.WithLogger(l),435 amqpextra.WithAMQPDial(436 amqpDialStub(conn),437 ),438 )439 require.NoError(t, err)440 defer d.Close()441 assertNoStateChanged(t, stateCh)442 newStateCh := d.Notify(make(chan amqpextra.State, 1))443 assertReady(t, newStateCh)444 d.Close()445 assertClosed(t, d)446 expected := `[DEBUG] connection unready447[DEBUG] dialing448[DEBUG] connection ready449[DEBUG] connection closed450`451 require.Equal(t, expected, l.Logs())452 })453 main.Run("UnreadyWhileWaitRetry", func(t *testing.T) {454 defer goleak.VerifyNone(t)455 ctrl := gomock.NewController(t)456 defer ctrl.Finish()457 l := logger.NewTest()458 stateCh := make(chan amqpextra.State, 1)459 conn := mock_amqpextra.NewMockAMQPConnection(ctrl)460 conn.EXPECT().NotifyClose(any()).AnyTimes()461 conn.EXPECT().Close().AnyTimes()462 d, err := amqpextra.NewDialer(463 amqpextra.WithURL("amqp://rabbitmq.host"),464 amqpextra.WithLogger(l),465 amqpextra.WithAMQPDial(466 amqpDialStub(time.Millisecond*10, errors.New("the error")),467 ),468 amqpextra.WithRetryPeriod(time.Millisecond*100),469 )470 require.NoError(t, err)471 newStateCh := d.Notify(stateCh)472 assertUnready(t, newStateCh, amqp.ErrClosed.Error())473 assertUnready(t, newStateCh, "the error")474 d.Close()475 assertClosed(t, d)476 expected := `[DEBUG] connection unready477[DEBUG] dialing478[DEBUG] connection unready: the error479[DEBUG] connection closed480`481 require.Equal(t, expected, l.Logs())482 })483 main.Run("UnreadyAfterClosed", func(t *testing.T) {484 defer goleak.VerifyNone(t)485 ctrl := gomock.NewController(t)486 defer ctrl.Finish()487 l := logger.NewTest()488 conn := mock_amqpextra.NewMockAMQPConnection(ctrl)489 conn.EXPECT().NotifyClose(any()).AnyTimes()490 conn.EXPECT().Close().AnyTimes()491 d, err := amqpextra.NewDialer(492 amqpextra.WithURL("amqp://rabbitmq.host"),493 amqpextra.WithLogger(l),494 amqpextra.WithAMQPDial(495 amqpDialStub(time.Millisecond*10, conn),496 ),497 )498 require.NoError(t, err)499 newStateCh := d.Notify(make(chan amqpextra.State, 2))500 d.Close()501 assertClosed(t, d)502 assertUnready(t, newStateCh, amqp.ErrClosed.Error())503 expected := `[DEBUG] connection unready504[DEBUG] dialing505[DEBUG] connection closed506`507 require.Equal(t, expected, l.Logs())508 })509}510func TestConnectedState(main *testing.T) {511 main.Run("Ready", func(t *testing.T) {512 defer goleak.VerifyNone(t)513 ctrl := gomock.NewController(t)514 defer ctrl.Finish()515 l := logger.NewTest()516 closeCh := make(chan *amqp.Error)517 stateCh := make(chan amqpextra.State, 2)518 amqpConn := mock_amqpextra.NewMockAMQPConnection(ctrl)519 amqpConn.EXPECT().Close().Return(nil)520 amqpConn.EXPECT().NotifyClose(any()).Return(closeCh)521 dialer, err := amqpextra.NewDialer(522 amqpextra.WithURL("amqp://rabbitmq.host"),523 amqpextra.WithNotify(stateCh),524 amqpextra.WithAMQPDial(amqpDialStub(amqpConn)),525 amqpextra.WithLogger(l),526 )527 require.NoError(t, err)528 assertReady(t, stateCh)529 conn := <-dialer.ConnectionCh()530 assertConnNotLost(t, conn)531 dialer.Close()532 assertClosed(t, dialer)533 assert.Equal(t, `[DEBUG] connection unready534[DEBUG] dialing535[DEBUG] connection ready536[DEBUG] connection closed537`, l.Logs())538 })539 main.Run("ReconnectOnError", func(t *testing.T) {540 defer goleak.VerifyNone(t)541 ctrl := gomock.NewController(t)542 defer ctrl.Finish()543 l := logger.NewTest()544 stateCh := make(chan amqpextra.State, 2)545 closeCh0 := make(chan *amqp.Error, 1)546 amqpConn0 := mock_amqpextra.NewMockAMQPConnection(ctrl)547 amqpConn0.EXPECT().NotifyClose(any()).Return(closeCh0)548 amqpConn0.EXPECT().Close().Return(nil)549 closeCh1 := make(chan *amqp.Error, 1)550 amqpConn1 := mock_amqpextra.NewMockAMQPConnection(ctrl)551 amqpConn1.EXPECT().NotifyClose(any()).Return(closeCh1)552 amqpConn1.EXPECT().Close().Return(nil)553 dialer, err := amqpextra.NewDialer(554 amqpextra.WithNotify(stateCh),555 amqpextra.WithURL("amqp://rabbitmq.host"),556 amqpextra.WithAMQPDial(amqpDialStub(amqpConn0, amqpConn1)),557 amqpextra.WithLogger(l),558 )559 require.NoError(t, err)560 assertReady(t, stateCh)561 conn0 := <-dialer.ConnectionCh()562 assertConnNotLost(t, conn0)563 closeCh0 <- amqp.ErrClosed564 assertConnLost(t, conn0)565 assertUnready(t, stateCh, amqp.ErrClosed.Error())566 conn1 := <-dialer.ConnectionCh()567 assertConnNotLost(t, conn1)568 assertReady(t, stateCh)569 dialer.Close()570 assertClosed(t, dialer)571 assert.Equal(t, `[DEBUG] connection unready572[DEBUG] dialing573[DEBUG] connection ready574[DEBUG] connection unready: Exception (504) Reason: "channel/connection is not open"575[DEBUG] dialing576[DEBUG] connection ready577[DEBUG] connection closed578`, l.Logs())579 })580 main.Run("ReconnectOnClose", func(t *testing.T) {581 defer goleak.VerifyNone(t)582 ctrl := gomock.NewController(t)583 defer ctrl.Finish()584 l := logger.NewTest()585 stateCh := make(chan amqpextra.State, 1)586 closeCh0 := make(chan *amqp.Error, 1)587 amqpConn0 := mock_amqpextra.NewMockAMQPConnection(ctrl)588 amqpConn0.EXPECT().NotifyClose(any()).Return(closeCh0)589 amqpConn0.EXPECT().Close().Return(nil)590 closeCh1 := make(chan *amqp.Error, 1)591 amqpConn1 := mock_amqpextra.NewMockAMQPConnection(ctrl)592 amqpConn1.EXPECT().NotifyClose(any()).Return(closeCh1)593 amqpConn1.EXPECT().Close().Return(nil)594 dialer, err := amqpextra.NewDialer(595 amqpextra.WithNotify(stateCh),596 amqpextra.WithURL("amqp://rabbitmq.host"),597 amqpextra.WithAMQPDial(amqpDialStub(amqpConn0, amqpConn1)),598 amqpextra.WithLogger(l),599 )600 require.NoError(t, err)601 conn0 := <-dialer.ConnectionCh()602 assertConnNotLost(t, conn0)603 assertReady(t, stateCh)604 assertNoStateChanged(t, stateCh)605 close(closeCh0)606 assertConnLost(t, conn0)607 assertUnready(t, stateCh, amqp.ErrClosed.Error())608 conn1 := <-dialer.ConnectionCh()609 assertConnNotLost(t, conn1)610 assertReady(t, stateCh)611 dialer.Close()612 assertClosed(t, dialer)613 assert.Equal(t, `[DEBUG] connection unready614[DEBUG] dialing615[DEBUG] connection ready616[DEBUG] connection unready: Exception (504) Reason: "channel/connection is not open"617[DEBUG] dialing618[DEBUG] connection ready619[DEBUG] connection closed620`, l.Logs())621 })622 main.Run("ConnectionCloseIgnoreErrClosed", func(t *testing.T) {623 defer goleak.VerifyNone(t)624 ctrl := gomock.NewController(t)625 defer ctrl.Finish()626 l := logger.NewTest()627 stateCh := make(chan amqpextra.State, 2)628 closeCh0 := make(chan *amqp.Error, 1)629 amqpConn := mock_amqpextra.NewMockAMQPConnection(ctrl)630 amqpConn.EXPECT().NotifyClose(any()).Return(closeCh0)631 amqpConn.EXPECT().Close().Return(amqp.ErrClosed)632 dialer, err := amqpextra.NewDialer(633 amqpextra.WithNotify(stateCh),634 amqpextra.WithURL("amqp://rabbitmq.host"),635 amqpextra.WithAMQPDial(amqpDialStub(amqpConn)),636 amqpextra.WithLogger(l),637 )638 require.NoError(t, err)639 assertReady(t, stateCh)640 dialer.Close()641 assertClosed(t, dialer)642 assert.Equal(t, `[DEBUG] connection unready643[DEBUG] dialing644[DEBUG] connection ready645[DEBUG] connection closed646`, l.Logs())647 })648 main.Run("ConnectionCloseErrored", func(t *testing.T) {649 defer goleak.VerifyNone(t)650 ctrl := gomock.NewController(t)651 defer ctrl.Finish()652 l := logger.NewTest()653 closeCh0 := make(chan *amqp.Error, 1)654 stateCh := make(chan amqpextra.State, 2)655 amqpConn := mock_amqpextra.NewMockAMQPConnection(ctrl)656 amqpConn.EXPECT().NotifyClose(any()).Return(closeCh0)657 amqpConn.EXPECT().Close().Return(fmt.Errorf("connection closed errored"))658 dialer, err := amqpextra.NewDialer(659 amqpextra.WithNotify(stateCh),660 amqpextra.WithURL("amqp://rabbitmq.host"),661 amqpextra.WithAMQPDial(amqpDialStub(amqpConn)),662 amqpextra.WithLogger(l),...

Full Screen

Full Screen

inbound_test.go

Source:inbound_test.go Github

copy

Full Screen

...31 pool, err := dctest.NewPool("")32 if err != nil {33 panic(fmt.Sprintf("pool: %v", err))34 }35 rabbitResource, err := pool.RunWithOptions(&dctest.RunOptions{36 Repository: dockerRabbitImage, Tag: dockerRabbitTag, Env: []string{},37 PortBindings: map[dc.Port][]dc.PortBinding{38 "5672/tcp": {{HostIP: "", HostPort: "5673"}},39 },40 })41 if err != nil {42 panic(fmt.Sprintf("run with options: %v", err))43 }44 defer func() {45 if err = pool.Purge(rabbitResource); err != nil {46 panic(fmt.Sprintf("purge: %v", err))47 }48 }()49 if err := checkRabbit(); err != nil {50 panic(fmt.Sprintf("check Rabbit: %v", err))51 }52 code = m.Run()53}54func checkRabbit() error {55 const retries = 6056 return backoff.Retry(func() error {57 _, err := amqp.Dial(amqpAddr)58 return err59 }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), retries))60}61func TestInboundTransport(t *testing.T) {62 t.Run("test inbound transport - with host/port", func(t *testing.T) {63 addr := amqpAddr64 inbound, err := NewInbound(addr, externalAddr, "queue", "", "")65 require.NoError(t, err)66 require.Equal(t, externalAddr, inbound.Endpoint())67 })68 t.Run("test inbound transport - with host/port, no external address", func(t *testing.T) {69 internalAddr := "amqp://example.com:5672"70 _, err := NewInbound(internalAddr, "", "queue", "", "")71 require.Error(t, err)72 })73 t.Run("test inbound transport - without host/port", func(t *testing.T) {74 port := ":5555"75 inbound, err := NewInbound(port, externalAddr, "queue", "", "")76 require.NoError(t, err)77 mockPackager := &mockpackager.Packager{UnpackValue: &transport.Envelope{Message: []byte("data")}}78 err = inbound.Start(&mockProvider{packagerValue: mockPackager})79 require.Error(t, err)80 })81 t.Run("test inbound transport - nil context", func(t *testing.T) {82 addr := amqpAddr83 inbound, err := NewInbound(addr, externalAddr, "queue", "", "")84 require.NoError(t, err)85 require.NotEmpty(t, inbound)86 err = inbound.Start(nil)87 require.Error(t, err)88 })89 t.Run("test inbound transport - invalid TLS", func(t *testing.T) {90 addr := amqpAddr91 inbound, err := NewInbound(addr, externalAddr, "queue", "invalid", "invalid")92 require.NoError(t, err)93 mockPackager := &mockpackager.Packager{UnpackValue: &transport.Envelope{Message: []byte("data")}}94 err = inbound.Start(&mockProvider{packagerValue: mockPackager})95 require.Error(t, err)96 require.Contains(t, err.Error(), "invalid cert")97 })98 t.Run("test inbound transport - invalid port number", func(t *testing.T) {99 _, err := NewInbound("", "", "", "", "")100 require.Error(t, err)101 require.Contains(t, err.Error(), "AMQP URL is mandatory")102 })103}104func TestInboundDataProcessing(t *testing.T) {105 t.Run("test inbound transport - multiple invocation with same client", func(t *testing.T) {106 addr := amqpAddr107 queue := "queue1"108 // initiate inbound with port109 inbound, err := NewInbound(addr, "http://example.com", queue, "", "")110 require.NoError(t, err)111 require.NotEmpty(t, inbound)112 // start server113 mockPackager := &mockpackager.Packager{UnpackValue: &transport.Envelope{Message: []byte("valid-data")}}114 err = inbound.Start(&mockProvider{packagerValue: mockPackager})115 require.NoError(t, err)116 // create ws client117 ch, cleanup := amqpClient(t, addr)118 defer cleanup()119 wait := make(chan amqp.Confirmation, 5)120 _ = ch.NotifyPublish(wait)121 for i := 1; i <= 5; i++ {122 err = ch.Publish(123 "", // exchange124 queue, // routing key125 false, // mandatory126 false, // immediate127 amqp.Publishing{128 ContentType: "test/test",129 Body: []byte("random"),130 })131 require.NoError(t, err)132 <-wait133 }134 err = inbound.Stop()135 require.NoError(t, err)136 })137 t.Run("test inbound transport - unpacking error", func(t *testing.T) {138 addr := amqpAddr139 queue := "queue2"140 inbound, err := NewInbound(addr, "http://example.com", queue, "", "")141 require.NoError(t, err)142 require.NotEmpty(t, inbound)143 // start server144 mockPackager := &mockpackager.Packager{UnpackErr: errors.New("error unpacking")}145 err = inbound.Start(&mockProvider{packagerValue: mockPackager})146 require.NoError(t, err)147 // create ws client148 ch, cleanup := amqpClient(t, addr)149 defer cleanup()150 wait := make(chan amqp.Confirmation, 1)151 _ = ch.NotifyPublish(wait)152 err = ch.Publish(153 "", // exchange154 queue, // routing key155 false, // mandatory156 false, // immediate157 amqp.Publishing{158 ContentType: "text/plain",159 Body: []byte(""),160 })161 <-wait162 require.NoError(t, err)163 })164 t.Run("test inbound transport - message handler error", func(t *testing.T) {165 addr := amqpAddr166 queue := "queue3"167 inbound, err := NewInbound(addr, "http://example.com", queue, "", "")168 require.NoError(t, err)169 require.NotEmpty(t, inbound)170 // start server171 mockPackager := &mockpackager.Packager{UnpackValue: &transport.Envelope{Message: []byte("invalid-data")}}172 err = inbound.Start(&mockProvider{packagerValue: mockPackager})173 require.NoError(t, err)174 // create ws client175 ch, cleanup := amqpClient(t, addr)176 defer cleanup()177 wait := make(chan amqp.Confirmation, 1)178 _ = ch.NotifyPublish(wait)...

Full Screen

Full Screen

client.go

Source:client.go Github

copy

Full Screen

...6 "time"7 "github.com/streadway/amqp"8)9const (10 noRun = iota11 run12)13var (14 // ErrNoConnection is an indicator that currently there is no connection15 // available16 ErrNoConnection = errors.New("No connection available")17)18// ClientOpt is a Client's functional option type19type ClientOpt func(*Client)20// Client is a Main AMQP client wrapper21type Client struct {22 addr string23 declarations []Declaration24 consumers map[*Consumer]struct{}25 publishers map[*Publisher]struct{}26 errs chan error27 blocking chan amqp.Blocking28 run int32 // bool29 conn atomic.Value //*amqp.Connection30 bo Backoffer31 attempt int3232 l sync.Mutex33 config amqp.Config34}35// Declare used to declare queues/exchanges/bindings.36// Declaration is saved and will be re-run every time Client gets connection37func (c *Client) Declare(d []Declaration) {38 c.l.Lock()39 defer c.l.Unlock()40 c.declarations = append(c.declarations, d...)41 if ch, err := c.channel(); err == nil {42 for _, declare := range d {43 declare(ch)44 }45 }46}47// Consume used to declare consumers48func (c *Client) Consume(cons *Consumer) {49 c.l.Lock()50 defer c.l.Unlock()51 c.consumers[cons] = struct{}{}52 if ch, err := c.channel(); err == nil {53 go cons.serve(c, ch)54 }55}56func (c *Client) deleteConsumer(cons *Consumer) {57 c.l.Lock()58 defer c.l.Unlock()59 delete(c.consumers, cons)60}61// Publish used to declare publishers62func (c *Client) Publish(pub *Publisher) {63 c.l.Lock()64 defer c.l.Unlock()65 c.publishers[pub] = struct{}{}66 if ch, err := c.channel(); err == nil {67 go pub.serve(c, ch)68 }69}70func (c *Client) deletePublisher(pub *Publisher) {71 c.l.Lock()72 defer c.l.Unlock()73 delete(c.publishers, pub)74}75// Errors returns AMQP connection level errors. Default buffer size is 100.76// Messages will be dropped in case if receiver can't keep up77func (c *Client) Errors() <-chan error {78 return c.errs79}80// Blocking notifies the server's TCP flow control of the Connection. Default81// buffer size is 10. Messages will be dropped in case if receiver can't keep up82func (c *Client) Blocking() <-chan amqp.Blocking {83 return c.blocking84}85// Close shutdown the client86func (c *Client) Close() {87 atomic.StoreInt32(&c.run, noRun) // c.run = false88 conn, _ := c.conn.Load().(*amqp.Connection)89 if conn != nil {90 conn.Close()91 }92 c.conn.Store((*amqp.Connection)(nil))93}94// Loop should be run as condition for `for` with receiving from (*Client).Errors()95//96// It will manage AMQP connection, run queue and exchange declarations, consumers.97// Will start to return false once (*Client).Close() called.98func (c *Client) Loop() bool {99 var (100 err error101 )102 if atomic.LoadInt32(&c.run) == noRun {103 return false104 }105 conn, _ := c.conn.Load().(*amqp.Connection)106 if conn != nil {107 return true108 }109 if c.bo != nil {110 time.Sleep(c.bo.Backoff(int(c.attempt)))111 atomic.AddInt32(&c.attempt, 1)112 }113 // set default Heartbeat to 10 seconds like in original amqp.Dial114 if c.config.Heartbeat == 0 {115 c.config.Heartbeat = 10 * time.Second116 }...

Full Screen

Full Screen

Run

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 fmt.Println(err)5 }6 ch, err := conn.Channel()7 if err != nil {8 fmt.Println(err)9 }10 ch.QueueDeclare("hello", false, false, false, false, nil)11 msg := amqp.Publishing{ContentType: "text/plain", Body: []byte("Hello World!")}12 ch.Publish("", "hello", false, false, msg)13 fmt.Println(" [x] Sent 'Hello World!'")14}15import (16func main() {17 if err != nil {18 fmt.Println(err)19 }20 ch, err := conn.Channel()21 if err != nil {22 fmt.Println(err)23 }24 msgs, err := ch.Consume("hello", "", true, false, false, false, nil)25 if err != nil {26 fmt.Println(err)27 }28 for msg := range msgs {29 fmt.Println("Received a message: ", string(msg.Body))30 }31}

Full Screen

Full Screen

Run

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 log.Fatalf("Error connecting to RabbitMQ: %s", err)5 }6 defer conn.Close()7 ch, err := conn.Channel()8 if err != nil {9 log.Fatalf("Error opening channel: %s", err)10 }11 defer ch.Close()12 q, err := ch.QueueDeclare(13 if err != nil {14 log.Fatalf("Error declaring queue: %s", err)15 }16 err = ch.Publish(17 amqp.Publishing{18 Body: []byte(body),19 })20 if err != nil {21 log.Fatalf("Error publishing message: %s", err)22 }23 fmt.Printf(" [x] Sent %s", body)24}25import (26func main() {27 if err != nil {28 log.Fatalf("Error connecting to RabbitMQ: %s", err)29 }30 defer conn.Close()31 ch, err := conn.Channel()32 if err != nil {33 log.Fatalf("Error opening channel: %s", err)34 }35 defer ch.Close()36 q, err := ch.QueueDeclare(37 if err != nil {38 log.Fatalf("Error declaring queue: %s", err)39 }

Full Screen

Full Screen

Run

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 log.Fatal(err)5 }6 defer conn.Close()7 ch, err := conn.Channel()8 if err != nil {9 log.Fatal(err)10 }11 defer ch.Close()12 q, err := ch.QueueDeclare(13 if err != nil {14 log.Fatal(err)15 }16 err = ch.Publish(17 amqp.Publishing{18 Body: []byte(body),19 })20 if err != nil {21 log.Fatal(err)22 }23 fmt.Println(" [x] Sent ", body)24}25import (26func main() {27 if err != nil {28 log.Fatal(err)29 }30 defer conn.Close()31 ch, err := conn.Channel()32 if err != nil {33 log.Fatal(err)34 }35 defer ch.Close()36 q, err := ch.QueueDeclare(37 if err != nil {38 log.Fatal(err)39 }40 msgs, err := ch.Consume(

Full Screen

Full Screen

Run

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 fmt.Println("Error connecting to rabbitmq server")5 }6 defer conn.Close()7 ch, err := conn.Channel()8 if err != nil {9 fmt.Println("Error creating channel")10 }11 defer ch.Close()12 q, err := ch.QueueDeclare("hello", false, false, false, false, nil)13 if err != nil {14 fmt.Println("Error declaring queue")15 }16 err = ch.Publish("", q.Name, false, false, amqp.Publishing{17 Body: []byte("Hello World!"),18 })19 if err != nil {20 fmt.Println("Error publishing message")21 }22}23import (24func main() {25 if err != nil {26 fmt.Println("Error connecting to rabbitmq server")27 }28 defer conn.Close()29 ch, err := conn.Channel()30 if err != nil {31 fmt.Println("Error creating channel")32 }33 defer ch.Close()34 q, err := ch.QueueDeclare("hello", false, false, false, false, nil)35 if err != nil {36 fmt.Println("Error declaring queue")37 }38 msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)39 if err != nil {40 fmt.Println("Error consuming message")41 }42 forever := make(chan bool)43 go func() {44 for d := range msgs {45 fmt.Println("Received a message: ", string(d.Body))46 }47 }()48 fmt.Println("Press CTRL+C to exit")49}50import (

Full Screen

Full Screen

Run

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 defer conn.Close()4 ch, _ := conn.Channel()5 defer ch.Close()6 q, _ := ch.QueueDeclare(7 err := ch.Publish(8 amqp.Publishing{9 Body: []byte(body),10 })11 failOnError(err, "Failed to publish a message")12 fmt.Println(" [x] Sent %s", body)13}14import (15func failOnError(err error, msg string) {16 if err != nil {17 fmt.Printf("%s: %s", msg, err)18 panic(fmt.Sprintf("%s: %s", msg, err))19 }20}21func main() {22 defer conn.Close()23 ch, _ := conn.Channel()24 defer ch.Close()25 q, _ := ch.QueueDeclare(26 msgs, _ := ch.Consume(

Full Screen

Full Screen

Run

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3 fmt.Println("Hello, playground")4 amqp := amqp{}5 amqp.Run()6}7import "fmt"8func main() {9 fmt.Println("Hello, playground")10 amqp := amqp{}11 amqp.Run()12}13import "fmt"14func main() {15 fmt.Println("Hello, playground")16 amqp := amqp{}17 amqp.Run()18}19import "fmt"20func main() {21 fmt.Println("Hello, playground")22 amqp := amqp{}23 amqp.Run()24}25import "fmt"26func main() {27 fmt.Println("Hello, playground")28 amqp := amqp{}29 amqp.Run()30}31import "fmt"32func main() {33 fmt.Println("Hello, playground")34 amqp := amqp{}35 amqp.Run()36}37import "fmt"38func main() {39 fmt.Println("Hello, playground")40 amqp := amqp{}41 amqp.Run()42}43import "fmt"44func main() {45 fmt.Println("Hello, playground")46 amqp := amqp{}47 amqp.Run()48}49import "fmt"50func main() {51 fmt.Println("Hello, playground")52 amqp := amqp{}53 amqp.Run()54}55import "fmt"56func main() {57 fmt.Println("Hello, playground")58 amqp := amqp{}59 amqp.Run()60}

Full Screen

Full Screen

Run

Using AI Code Generation

copy

Full Screen

1func main() {2 amqp := AMQP{}3 amqp.Run()4}5import (6type AMQP struct{}7func (amqp *AMQP) Run() {8 if err != nil {9 fmt.Println(err)10 }11 defer connection.Close()12 channel, err := connection.Channel()13 if err != nil {14 fmt.Println(err)15 }16 defer channel.Close()17 queue, err := channel.QueueDeclare(18 if err != nil {19 fmt.Println(err)20 }21 err = channel.Publish(22 amqp.Publishing{23 Body: []byte("Hello World!"),24 },25 if err != nil {26 fmt.Println(err)27 }28 fmt.Println("Message Published")29}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Venom automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful