How to use Drain method of testutils Package

Best K6 code snippet using testutils.Drain

drain_test.go

Source:drain_test.go Github

copy

Full Screen

...140 bitFlyerClient := mock_bitflyer.NewMockClient(ctrl)141 bitFlyerClient.EXPECT().142 FetchQuote(ctx, "BAT_JPY", false).143 Return(&quote, nil)144 drainTransfers := make([]DrainTransfer, 5)145 for i := 0; i < len(drainTransfers); i++ {146 depositID := ptr.FromString(uuid.NewV4().String())147 // set invalid deposit id148 if i == 3 {149 depositID = nil150 }151 drainTransfers[i] = DrainTransfer{152 ID: ptr.FromUUID(uuid.NewV4()),153 Total: decimal.NewFromFloat(1),154 DepositID: depositID,155 }156 }157 datastore := NewMockDatastore(ctrl)158 datastore.EXPECT().159 GetDrainsByBatchID(ctx, batchID).160 Return(drainTransfers, nil)161 s := Service{162 bfClient: bitFlyerClient,163 Datastore: datastore,164 }165 expected := errorutils.New(fmt.Errorf("failed depositID cannot be nil for batchID %s", batchID),166 "submit batch transfer", drainCodeErrorInvalidDepositID)167 err := s.SubmitBatchTransfer(ctx, batchID)168 assert.Equal(t, expected, err)169}170func TestGetGeminiTxnStatus_Completed(t *testing.T) {171 ctrl := gomock.NewController(t)172 defer ctrl.Finish()173 apiKey := testutils.RandomString()174 clientID := testutils.RandomString()175 txRef := testutils.RandomString()176 ctx := context.Background()177 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)178 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)179 response := &gemini.PayoutResult{180 Result: "Ok",181 Status: ptr.FromString("Completed"),182 }183 geminiClient := mock_gemini.NewMockClient(ctrl)184 geminiClient.EXPECT().185 CheckTxStatus(ctx, apiKey, clientID, txRef).186 Return(response, nil)187 service := Service{188 geminiClient: geminiClient,189 }190 actual, err := service.GetGeminiTxnStatus(ctx, txRef)191 assert.Nil(t, err)192 assert.Equal(t, "complete", actual.Status)193 assert.Equal(t, "", actual.Note)194}195func TestGetGeminiTxnStatus_Pending(t *testing.T) {196 ctrl := gomock.NewController(t)197 defer ctrl.Finish()198 apiKey := testutils.RandomString()199 clientID := testutils.RandomString()200 txRef := testutils.RandomString()201 ctx := context.Background()202 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)203 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)204 response := &gemini.PayoutResult{205 Result: "Ok",206 Status: ptr.FromString("Pending"),207 }208 geminiClient := mock_gemini.NewMockClient(ctrl)209 geminiClient.EXPECT().210 CheckTxStatus(ctx, apiKey, clientID, txRef).211 Return(response, nil)212 service := Service{213 geminiClient: geminiClient,214 }215 actual, err := service.GetGeminiTxnStatus(ctx, txRef)216 assert.Nil(t, err)217 assert.Equal(t, "pending", actual.Status)218 assert.Equal(t, "", actual.Note)219}220func TestGetGeminiTxnStatus_Processing(t *testing.T) {221 ctrl := gomock.NewController(t)222 defer ctrl.Finish()223 apiKey := testutils.RandomString()224 clientID := testutils.RandomString()225 txRef := testutils.RandomString()226 ctx := context.Background()227 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)228 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)229 response := &gemini.PayoutResult{230 Result: "Ok",231 Status: ptr.FromString("Processing"),232 }233 geminiClient := mock_gemini.NewMockClient(ctrl)234 geminiClient.EXPECT().235 CheckTxStatus(ctx, apiKey, clientID, txRef).236 Return(response, nil)237 service := Service{238 geminiClient: geminiClient,239 }240 actual, err := service.GetGeminiTxnStatus(ctx, txRef)241 assert.Nil(t, err)242 assert.Equal(t, "pending", actual.Status)243 assert.Equal(t, "", actual.Note)244}245func TestGetGeminiTxnStatus_Failed(t *testing.T) {246 ctrl := gomock.NewController(t)247 defer ctrl.Finish()248 apiKey := testutils.RandomString()249 clientID := testutils.RandomString()250 txRef := testutils.RandomString()251 ctx := context.Background()252 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)253 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)254 response := &gemini.PayoutResult{255 Result: "Ok",256 Status: ptr.FromString("Failed"),257 Reason: ptr.FromString(testutils.RandomString()),258 }259 geminiClient := mock_gemini.NewMockClient(ctrl)260 geminiClient.EXPECT().261 CheckTxStatus(ctx, apiKey, clientID, txRef).262 Return(response, nil)263 service := Service{264 geminiClient: geminiClient,265 }266 actual, err := service.GetGeminiTxnStatus(ctx, txRef)267 assert.Nil(t, err)268 assert.Equal(t, "failed", actual.Status)269 assert.Equal(t, *response.Reason, actual.Note)270}271func TestGetGeminiTxnStatus_Unknown(t *testing.T) {272 ctrl := gomock.NewController(t)273 defer ctrl.Finish()274 apiKey := testutils.RandomString()275 clientID := testutils.RandomString()276 txRef := testutils.RandomString()277 ctx := context.Background()278 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)279 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)280 response := &gemini.PayoutResult{281 Result: "Ok",282 Status: ptr.FromString(testutils.RandomString()),283 }284 geminiClient := mock_gemini.NewMockClient(ctrl)285 geminiClient.EXPECT().286 CheckTxStatus(ctx, apiKey, clientID, txRef).287 Return(response, nil)288 service := Service{289 geminiClient: geminiClient,290 }291 actual, err := service.GetGeminiTxnStatus(ctx, txRef)292 assert.Nil(t, actual)293 assert.Error(t, err, fmt.Errorf("failed to get txn status for %s: unknown status %s",294 txRef, ptr.String(response.Status)).Error())295}296func TestGetGeminiTxnStatus_Response_Nil(t *testing.T) {297 ctrl := gomock.NewController(t)298 defer ctrl.Finish()299 apiKey := testutils.RandomString()300 clientID := testutils.RandomString()301 txRef := testutils.RandomString()302 ctx := context.Background()303 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)304 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)305 geminiClient := mock_gemini.NewMockClient(ctrl)306 geminiClient.EXPECT().307 CheckTxStatus(ctx, apiKey, clientID, txRef).308 Return(nil, nil)309 service := Service{310 geminiClient: geminiClient,311 }312 actual, err := service.GetGeminiTxnStatus(ctx, txRef)313 assert.Nil(t, actual)314 assert.EqualError(t, err, fmt.Errorf("failed to get gemini txn status for %s: response nil", txRef).Error())315}316func TestGetGeminiTxnStatus_CheckStatus_Error(t *testing.T) {317 ctrl := gomock.NewController(t)318 defer ctrl.Finish()319 apiKey := testutils.RandomString()320 clientID := testutils.RandomString()321 txRef := testutils.RandomString()322 ctx := context.Background()323 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)324 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)325 clientError := errors.New(testutils.RandomString())326 geminiClient := mock_gemini.NewMockClient(ctrl)327 geminiClient.EXPECT().328 CheckTxStatus(ctx, apiKey, clientID, txRef).329 Return(nil, clientError)330 service := Service{331 geminiClient: geminiClient,332 }333 actual, err := service.GetGeminiTxnStatus(ctx, txRef)334 assert.Nil(t, actual)335 assert.EqualError(t, err, fmt.Errorf("failed to check gemini txn status for %s: %w", txRef, clientError).Error())336}337func TestGetGeminiTxnStatus_CheckStatus_ErrorBundle(t *testing.T) {338 ctrl := gomock.NewController(t)339 defer ctrl.Finish()340 apiKey := testutils.RandomString()341 clientID := testutils.RandomString()342 txRef := testutils.RandomString()343 ctx := context.Background()344 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)345 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)346 header := http.Header{}347 header.Add(testutils.RandomString(), testutils.RandomString())348 header.Add(testutils.RandomString(), testutils.RandomString())349 path := testutils.RandomString()350 status := http.StatusInternalServerError351 message := testutils.RandomString()352 errorData := struct {353 ResponseHeaders interface{}354 Body interface{}355 }{356 ResponseHeaders: header,357 Body: testutils.RandomString(),358 }359 wrappedError := errors.New(testutils.RandomString())360 errorBundle := clients.NewHTTPError(wrappedError, path, message, status, errorData)361 clientError := fmt.Errorf("client error %w", errorBundle)362 geminiClient := mock_gemini.NewMockClient(ctrl)363 geminiClient.EXPECT().364 CheckTxStatus(ctx, apiKey, clientID, txRef).365 Return(nil, clientError)366 service := Service{367 geminiClient: geminiClient,368 }369 actual, err := service.GetGeminiTxnStatus(ctx, txRef)370 assert.Nil(t, actual)371 assert.EqualError(t, err, fmt.Errorf("failed to check gemini txn status for %s: %w", txRef, clientError).Error())372}373func TestGetGeminiTxnStatus_CheckStatus_ErrorBundle_StatusNotFound(t *testing.T) {374 ctrl := gomock.NewController(t)375 defer ctrl.Finish()376 apiKey := testutils.RandomString()377 clientID := testutils.RandomString()378 txRef := testutils.RandomString()379 ctx := context.Background()380 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)381 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)382 header := http.Header{}383 header.Add(testutils.RandomString(), testutils.RandomString())384 header.Add(testutils.RandomString(), testutils.RandomString())385 path := testutils.RandomString()386 status := http.StatusNotFound387 message := testutils.RandomString()388 errorData := struct {389 ResponseHeaders interface{}390 Body interface{}391 }{392 ResponseHeaders: header,393 Body: testutils.RandomString(),394 }395 wrappedError := errors.New(testutils.RandomString())396 errorBundle := clients.NewHTTPError(wrappedError, path, message, status, errorData)397 clientError := fmt.Errorf("client error %w", errorBundle)398 geminiClient := mock_gemini.NewMockClient(ctrl)399 geminiClient.EXPECT().400 CheckTxStatus(ctx, apiKey, clientID, txRef).401 Return(nil, clientError)402 service := Service{403 geminiClient: geminiClient,404 }405 actual, err := service.GetGeminiTxnStatus(ctx, txRef)406 assert.Nil(t, err)407 assert.Equal(t, "failed", actual.Status)408 assert.Equal(t, "GEMINI_NOT_FOUND", actual.Note)409}410func TestGetGeminiTxnStatus_ResponseError_NoReason(t *testing.T) {411 ctrl := gomock.NewController(t)412 defer ctrl.Finish()413 apiKey := testutils.RandomString()414 clientID := testutils.RandomString()415 txRef := testutils.RandomString()416 ctx := context.Background()417 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)418 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)419 response := &gemini.PayoutResult{420 Result: "Error",421 }422 geminiClient := mock_gemini.NewMockClient(ctrl)423 geminiClient.EXPECT().424 CheckTxStatus(ctx, apiKey, clientID, txRef).425 Return(response, nil)426 service := Service{427 geminiClient: geminiClient,428 }429 actual, err := service.GetGeminiTxnStatus(ctx, txRef)430 assert.Nil(t, actual)431 assert.EqualError(t, err, fmt.Errorf("failed to get gemini txn status for %s: unknown gemini response error", txRef).Error())432}433func TestGetGeminiTxnStatus_ResponseError_WithReason(t *testing.T) {434 ctrl := gomock.NewController(t)435 defer ctrl.Finish()436 apiKey := testutils.RandomString()437 clientID := testutils.RandomString()438 txRef := testutils.RandomString()439 ctx := context.Background()440 ctx = context.WithValue(ctx, appctx.GeminiAPIKeyCTXKey, apiKey)441 ctx = context.WithValue(ctx, appctx.GeminiClientIDCTXKey, clientID)442 response := &gemini.PayoutResult{443 Result: "Error",444 Reason: ptr.FromString(testutils.RandomString()),445 }446 geminiClient := mock_gemini.NewMockClient(ctrl)447 geminiClient.EXPECT().448 CheckTxStatus(ctx, apiKey, clientID, txRef).449 Return(response, nil)450 service := Service{451 geminiClient: geminiClient,452 }453 actual, err := service.GetGeminiTxnStatus(ctx, txRef)454 assert.Nil(t, actual)455 assert.EqualError(t, err, fmt.Errorf("failed to get gemini txn status for %s: %s", txRef, *response.Reason).Error())456}457func TestSubmitBatchTransfer_UploadBulkPayout_NOINV(t *testing.T) {458 ctrl := gomock.NewController(t)459 defer ctrl.Finish()460 ctx, _ := logging.SetupLogger(context.Background())461 batchID := ptr.FromUUID(uuid.NewV4())462 quote := bitflyer.Quote{463 Rate: decimal.New(1, 1),464 }465 bfClient := mock_bitflyer.NewMockClient(ctrl)466 bfClient.EXPECT().467 FetchQuote(ctx, "BAT_JPY", false).468 Return(&quote, nil)469 drainTransfers := make([]DrainTransfer, 1)470 drainTransfers[0] = DrainTransfer{471 ID: ptr.FromUUID(uuid.NewV4()),472 Total: decimal.NewFromFloat(1),473 DepositID: ptr.FromString(uuid.NewV4().String()),474 }475 datastore := NewMockDatastore(ctrl)476 datastore.EXPECT().477 GetDrainsByBatchID(ctx, batchID).478 Return(drainTransfers, nil)479 var bitflyerError = new(clients.BitflyerError)480 bitflyerError.HTTPStatusCode = http.StatusUnauthorized481 bfClient.EXPECT().482 UploadBulkPayout(ctx, gomock.Any()).483 Return(nil, bitflyerError)484 bfClient.EXPECT().485 RefreshToken(ctx, gomock.Any()).486 Return(nil, nil)487 withdrawal := bitflyer.WithdrawToDepositIDResponse{488 Status: "NO_INV",489 }490 withdrawToDepositIDBulkResponse := bitflyer.WithdrawToDepositIDBulkResponse{491 DryRun: false,492 Withdrawals: []bitflyer.WithdrawToDepositIDResponse{493 withdrawal,494 },495 }496 bfClient.EXPECT().497 UploadBulkPayout(ctx, gomock.Any()).498 Return(&withdrawToDepositIDBulkResponse, nil)499 s := Service{500 bfClient: bfClient,501 Datastore: datastore,502 }503 err := fmt.Errorf("submit batch transfer error: bitflyer %s error for batchID %s",504 withdrawal.Status, withdrawal.TransferID)505 codified := errorutils.Codified{506 ErrCode: "bitflyer_no_inv",507 Retry: false,508 }509 expected := errorutils.New(err, "submit batch transfer", codified)510 actual := s.SubmitBatchTransfer(ctx, batchID)511 assert.Equal(t, expected, actual)512}513func TestSubmitBatchTransfer_UploadBulkPayout_Error(t *testing.T) {514 ctrl := gomock.NewController(t)515 defer ctrl.Finish()516 ctx, _ := logging.SetupLogger(context.Background())517 batchID := ptr.FromUUID(uuid.NewV4())518 quote := bitflyer.Quote{519 Rate: decimal.New(1, 1),520 }521 bfClient := mock_bitflyer.NewMockClient(ctrl)522 bfClient.EXPECT().523 FetchQuote(ctx, "BAT_JPY", false).524 Return(&quote, nil)525 drainTransfers := make([]DrainTransfer, 1)526 drainTransfers[0] = DrainTransfer{527 ID: ptr.FromUUID(uuid.NewV4()),528 Total: decimal.NewFromFloat(1),529 DepositID: ptr.FromString(uuid.NewV4().String()),530 }531 datastore := NewMockDatastore(ctrl)532 datastore.EXPECT().533 GetDrainsByBatchID(ctx, batchID).534 Return(drainTransfers, nil)535 var bitflyerError = new(clients.BitflyerError)536 bitflyerError.HTTPStatusCode = http.StatusUnauthorized537 err := errors.New("some error")538 bfClient.EXPECT().539 UploadBulkPayout(ctx, gomock.Any()).540 Return(nil, err)541 s := Service{542 bfClient: bfClient,543 Datastore: datastore,544 }545 actual := s.SubmitBatchTransfer(ctx, batchID)546 assert.EqualError(t, actual, fmt.Sprintf("failed to transfer funds: %s", err.Error()))547}548func TestSubmitBatchTransfer_UploadBulkPayout_Bitflyer_Unauthorized_Retry(t *testing.T) {549 ctrl := gomock.NewController(t)550 defer ctrl.Finish()551 ctx, _ := logging.SetupLogger(context.Background())552 batchID := ptr.FromUUID(uuid.NewV4())553 quote := bitflyer.Quote{554 Rate: decimal.New(1, 1),555 }556 bfClient := mock_bitflyer.NewMockClient(ctrl)557 bfClient.EXPECT().558 FetchQuote(ctx, "BAT_JPY", false).559 Return(&quote, nil)560 drainTransfers := make([]DrainTransfer, 1)561 drainTransfers[0] = DrainTransfer{562 ID: ptr.FromUUID(uuid.NewV4()),563 Total: decimal.NewFromFloat(1),564 DepositID: ptr.FromString(uuid.NewV4().String()),565 }566 datastore := NewMockDatastore(ctrl)567 datastore.EXPECT().568 GetDrainsByBatchID(ctx, batchID).569 Return(drainTransfers, nil)570 var bitflyerError = new(clients.BitflyerError)571 bitflyerError.HTTPStatusCode = http.StatusUnauthorized572 bfClient.EXPECT().573 UploadBulkPayout(ctx, gomock.Any()).574 Return(nil, bitflyerError)575 bfClient.EXPECT().576 RefreshToken(ctx, gomock.Any()).577 Return(nil, nil)578 withdrawToDepositIDBulkResponse := bitflyer.WithdrawToDepositIDBulkResponse{579 DryRun: false,580 Withdrawals: []bitflyer.WithdrawToDepositIDResponse{581 {582 Status: "SUCCESS",583 },584 },585 }586 bfClient.EXPECT().587 UploadBulkPayout(ctx, gomock.Any()).588 Return(&withdrawToDepositIDBulkResponse, nil)589 s := Service{590 bfClient: bfClient,591 Datastore: datastore,592 }593 err := s.SubmitBatchTransfer(ctx, batchID)594 assert.Nil(t, err)595}596func TestSubmitBatchTransfer_UploadBulkPayout_Bitflyer_Unauthorized_NoRetry(t *testing.T) {597 ctrl := gomock.NewController(t)598 defer ctrl.Finish()599 ctx, _ := logging.SetupLogger(context.Background())600 batchID := ptr.FromUUID(uuid.NewV4())601 quote := bitflyer.Quote{602 Rate: decimal.New(1, 1),603 }604 bfClient := mock_bitflyer.NewMockClient(ctrl)605 bfClient.EXPECT().606 FetchQuote(ctx, "BAT_JPY", false).607 Return(&quote, nil)608 drainTransfers := make([]DrainTransfer, 1)609 drainTransfers[0] = DrainTransfer{610 ID: ptr.FromUUID(uuid.NewV4()),611 Total: decimal.NewFromFloat(1),612 DepositID: ptr.FromString(uuid.NewV4().String()),613 }614 datastore := NewMockDatastore(ctrl)615 datastore.EXPECT().616 GetDrainsByBatchID(ctx, batchID).617 Return(drainTransfers, nil)618 var bitflyerError = new(clients.BitflyerError)619 bitflyerError.HTTPStatusCode = http.StatusUnauthorized620 bfClient.EXPECT().621 UploadBulkPayout(ctx, gomock.Any()).622 Return(nil, bitflyerError)623 refreshTokenError := errors.New("some error")624 bfClient.EXPECT().625 RefreshToken(ctx, gomock.Any()).626 Return(nil, refreshTokenError)627 s := Service{628 bfClient: bfClient,629 Datastore: datastore,630 }631 err := s.SubmitBatchTransfer(ctx, batchID)632 assert.EqualError(t, err, fmt.Errorf("failed to get token from bf: %w", refreshTokenError).Error())633}634func TestSubmitBatchTransfer_UploadBulkPayout_Bitflyer_NoWithdrawals(t *testing.T) {635 ctrl := gomock.NewController(t)636 defer ctrl.Finish()637 ctx, _ := logging.SetupLogger(context.Background())638 batchID := ptr.FromUUID(uuid.NewV4())639 quote := bitflyer.Quote{640 Rate: decimal.New(1, 1),641 }642 bfClient := mock_bitflyer.NewMockClient(ctrl)643 bfClient.EXPECT().644 FetchQuote(ctx, "BAT_JPY", false).645 Return(&quote, nil)646 drainTransfers := make([]DrainTransfer, 1)647 drainTransfers[0] = DrainTransfer{648 ID: ptr.FromUUID(uuid.NewV4()),649 Total: decimal.NewFromFloat(1),650 DepositID: ptr.FromString(uuid.NewV4().String()),651 }652 datastore := NewMockDatastore(ctrl)653 datastore.EXPECT().654 GetDrainsByBatchID(ctx, batchID).655 Return(drainTransfers, nil)656 var bitflyerError = new(clients.BitflyerError)657 bitflyerError.HTTPStatusCode = http.StatusUnauthorized658 bfClient.EXPECT().659 UploadBulkPayout(ctx, gomock.Any()).660 Return(nil, bitflyerError)661 bfClient.EXPECT().662 RefreshToken(ctx, gomock.Any()).663 Return(nil, nil)664 // no withdraws665 withdrawToDepositIDBulkResponse := bitflyer.WithdrawToDepositIDBulkResponse{666 DryRun: false,667 Withdrawals: []bitflyer.WithdrawToDepositIDResponse{},668 }...

Full Screen

Full Screen

inbox_test.go

Source:inbox_test.go Github

copy

Full Screen

...146 require.True(t, testutils.IsError(streamErr, "stream arrived too late"), streamErr)147}148// TestInboxShutdown is a random test that spawns a goroutine for handling a149// FlowStream RPC (setting up an inbound stream, or RunWithStream), and a150// goroutine to read from an Inbox (Next and DrainMeta goroutine) that gets151// randomly canceled.152// These goroutines race against each other and the153// desired state is that everything is cleaned up at the end. Examples of154// scenarios that are tested by this test include but are not limited to:155// - DrainMeta called before Next and before a stream arrives.156// - DrainMeta called with an active stream.157// - A forceful cancellation of Next but no call to DrainMeta.158func TestInboxShutdown(t *testing.T) {159 defer leaktest.AfterTest(t)()160 var (161 rng, _ = randutil.NewPseudoRand()162 // infiniteBatches will influence whether or not we're likely to test a163 // graceful shutdown (since other shutdown mechanisms might happen before164 // we reach the end of the data stream). If infiniteBatches is true,165 // shutdown scenarios in the middle of data processing are always tested. If166 // false, they sometimes will be.167 infiniteBatches = rng.Float64() < 0.5168 cancelSleep = time.Millisecond * time.Duration(rng.Intn(10))169 nextSleep = time.Millisecond * time.Duration(rng.Intn(10))170 runWithStreamSleep = time.Millisecond * time.Duration(rng.Intn(10))171 typs = []*types.T{types.Int}172 batch = coldatatestutils.RandomBatch(testAllocator, rng, typs, coldata.BatchSize(), 0 /* length */, rng.Float64())173 )174 // drainMetaScenario specifies when DrainMeta should be called in the Next175 // goroutine.176 type drainMetaScenario int177 const (178 // drainMetaBeforeNext specifies that DrainMeta should be called before the179 // Next goroutine.180 drainMetaBeforeNext drainMetaScenario = iota181 // drainMetaPrematurely specifies that DrainMeta should be called after the182 // first call to Next.183 drainMetaPrematurely184 // drianMetaAfterNextIsExhausted specifies that DrainMeta should be called185 // after Next returns a zero-length batch.186 drainMetaAfterNextIsExhausted187 // drainMetaNotCalled specifies that DrainMeta is not called during188 // execution.189 drainMetaNotCalled190 maxDrainMetaScenario191 )192 for _, cancel := range []bool{false, true} {193 for _, runNextGoroutine := range []bool{false, true} {194 drainScenario := drainMetaScenario(rng.Intn(int(maxDrainMetaScenario)))195 if infiniteBatches {196 // This test won't finish unless we drain at some point.197 drainScenario = drainMetaPrematurely198 }199 if cancel {200 // If a cancel happens, we don't want to call DrainMeta because the mock201 // RPC layer doesn't handle this case well. The drain signal could be202 // sent over a closed channel. The real RPC layer would return EOF, but203 // it's not easy to check for a closed channel without reading from it.204 drainScenario = drainMetaNotCalled205 }206 for _, runRunWithStreamGoroutine := range []bool{false, true} {207 if runNextGoroutine == false && runRunWithStreamGoroutine == true {208 // This is sort of like a remote node connecting to the inbox, but the209 // inbox will never be spawned. This is dealt with by another part of210 // the code (the flow registry times out inbound RPCs if a consumer is211 // not scheduled in time), so this case is skipped.212 continue213 }214 rpcLayer := makeMockFlowStreamRPCLayer()215 t.Run(fmt.Sprintf(216 "cancel=%t/next=%t/stream=%t/inf=%t",217 cancel, runNextGoroutine, runRunWithStreamGoroutine, infiniteBatches,218 ), func(t *testing.T) {219 inboxCtx, inboxCancel := context.WithCancel(context.Background())220 inboxMemAccount := testMemMonitor.MakeBoundAccount()221 defer inboxMemAccount.Close(inboxCtx)222 inbox, err := NewInbox(223 colmem.NewAllocator(inboxCtx, &inboxMemAccount, coldata.StandardColumnFactory),224 typs, execinfrapb.StreamID(0),225 )226 require.NoError(t, err)227 c, err := colserde.NewArrowBatchConverter(typs)228 require.NoError(t, err)229 r, err := colserde.NewRecordBatchSerializer(typs)230 require.NoError(t, err)231 goroutines := []struct {232 name string233 asyncOperation func() chan error234 }{235 {236 name: "RunWithStream",237 asyncOperation: func() chan error {238 errCh := make(chan error)239 go func() {240 var wg sync.WaitGroup241 defer close(errCh)242 if runWithStreamSleep != 0 {243 time.Sleep(runWithStreamSleep)244 }245 if !runRunWithStreamGoroutine {246 // The inbox needs to be timed out. This is called by the inbound247 // stream code during normal operation. This timeout simulates a248 // stream not arriving in time.249 inbox.Timeout(errors.New("artificial timeout"))250 return251 }252 quitSending := make(chan struct{})253 wg.Add(1)254 go func() {255 defer wg.Done()256 arrowData, err := c.BatchToArrow(batch)257 if err != nil {258 errCh <- err259 return260 }261 var buffer bytes.Buffer262 _, _, err = r.Serialize(&buffer, arrowData)263 if err != nil {264 errCh <- err265 return266 }267 var draining uint32268 // Listen for the drain signal.269 wg.Add(1)270 go func() {271 defer wg.Done()272 for {273 cs, err := rpcLayer.client.Recv()274 if cs != nil && cs.DrainRequest != nil {275 atomic.StoreUint32(&draining, 1)276 return277 }278 // TODO(asubiotto): Generate some metadata and test279 // that it is received.280 if err != nil {281 if err == io.EOF {282 return283 }284 errCh <- err285 }286 }287 }()288 msg := &execinfrapb.ProducerMessage{Data: execinfrapb.ProducerData{RawBytes: buffer.Bytes()}}289 batchesToSend := rng.Intn(65536)290 for i := 0; infiniteBatches || i < batchesToSend; i++ {291 if atomic.LoadUint32(&draining) == 1 {292 break293 }294 quitLoop := false295 select {296 case rpcLayer.client.pmChan <- msg:297 case <-quitSending:298 quitLoop = true299 }300 if quitLoop {301 break302 }303 }304 if err := rpcLayer.client.CloseSend(); err != nil {305 errCh <- err306 }307 }()308 // Use context.Background() because it's separate from the309 // inbox context.310 handleErr := <-handleStream(context.Background(), inbox, rpcLayer.server, func() { close(rpcLayer.server.csChan) })311 close(quitSending)312 wg.Wait()313 errCh <- handleErr314 }()315 return errCh316 },317 },318 {319 name: "Next",320 asyncOperation: func() chan error {321 errCh := make(chan error)322 go func() {323 defer close(errCh)324 if !runNextGoroutine {325 return326 }327 if drainScenario == drainMetaBeforeNext {328 _ = inbox.DrainMeta(inboxCtx)329 return330 }331 if nextSleep != 0 {332 time.Sleep(nextSleep)333 }334 var (335 done bool336 err error337 )338 for !done && err == nil {339 err = colexecerror.CatchVectorizedRuntimeError(func() { b := inbox.Next(inboxCtx); done = b.Length() == 0 })340 if drainScenario == drainMetaPrematurely {341 _ = inbox.DrainMeta(inboxCtx)342 return343 }344 }345 if drainScenario == drainMetaAfterNextIsExhausted {346 _ = inbox.DrainMeta(inboxCtx)347 }348 errCh <- err349 }()350 return errCh351 },352 },353 {354 name: "Cancel",355 asyncOperation: func() chan error {356 errCh := make(chan error)357 go func() {358 defer func() {359 if cancel {360 inboxCancel()...

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 f, err := os.Open("test.txt")4 if err != nil {5 fmt.Println(err)6 }7 scanner := bufio.NewScanner(f)8 scanner.Split(bufio.ScanWords)9 for scanner.Scan() {10 }11 fmt.Printf("Words: %d12 err = f.Close()13 if err != nil {14 fmt.Println(err)15 }16 if err := scanner.Err(); err != nil {17 fmt.Fprintln(os.Stderr, "reading input:", err)18 }19}20import (21func main() {22 f, err := os.Open("test.txt")23 if err != nil {24 fmt.Println(err)25 }26 scanner := bufio.NewScanner(f)27 scanner.Split(bufio.ScanLines)28 for scanner.Scan() {29 }30 fmt.Printf("Lines: %d31 err = f.Close()32 if err != nil {33 fmt.Println(err)34 }35 if err := scanner.Err(); err != nil {36 fmt.Fprintln(os.Stderr, "reading input:", err)37 }38}39import (40func main() {41 f, err := os.Open("test.txt")42 if err != nil {43 fmt.Println(err)44 }

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 f := new(testrunnerfakes.FakeTestRunner)4 f.DrainReturns(1, nil)5 fmt.Println(f.Drain())6}7import (8func main() {9 f := new(testrunnerfakes.FakeTestRunner)10 f.DrainReturns(1, nil)11 fmt.Println(f.Drain())12}13import (14func main() {15 f := new(testrunnerfakes.FakeTestRunner)16 f.DrainReturns(1, nil)17 fmt.Println(f.Drain())18}19import (20func main() {21 f := new(testrunnerfakes.FakeTestRunner)22 f.DrainReturns(1, nil)23 fmt.Println(f.Drain())24}25import (26func main() {27 f := new(testrunnerfakes.FakeTestRunner)28 f.DrainReturns(1, nil)29 fmt.Println(f.Drain())30}31import (32func main() {33 f := new(testrunnerfakes.FakeTestRunner)34 f.DrainReturns(1, nil)35 fmt.Println(f.Drain())36}37import (

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func TestDrain(t *testing.T) {3 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {4 fmt.Fprintln(w, "Hello, client")5 }))6 defer ts.Close()7 res, err := http.Get(ts.URL)8 if err != nil {9 log.Fatal(err)10 }11 _, err = ioutil.ReadAll(res.Body)12 if err != nil {13 log.Fatal(err)14 }15 defer res.Body.Close()16 err = testutils.Drain(res.Body)17 if err != nil {18 log.Fatal(err)19 }20}21import (22func Drain(r io.Reader) error {23 _, err := ioutil.ReadAll(r)24}25import (26func TestDrain(t *testing.T) {27 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {28 fmt.Fprintln(w, "Hello, client")29 }))30 defer ts.Close()31 res, err := http.Get(ts.URL)32 if err != nil {33 log.Fatal(err)34 }35 _, err = ioutil.ReadAll(res.Body)36 if err != nil {37 log.Fatal(err)38 }39 defer res.Body.Close()40 err = testutils.Drain(res.Body)41 if err != nil {42 log.Fatal(err)43 }44}45import (46func Drain(r io

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 ch := make(chan string, 1)4 go func() {5 time.Sleep(2 * time.Second)6 }()7 fmt.Println(testutils.Drain(ch))8}9import (10func main() {11 ch := make(chan string, 1)12 go func() {13 time.Sleep(2 * time.Second)14 }()15 fmt.Println(testutils.Drain(ch))16}17import (18func main() {19 ch := make(chan string, 1)20 go func() {21 time.Sleep(2 * time.Second)22 }()23 fmt.Println(testutils.Drain(ch))24}25import (26func main() {27 ch := make(chan string, 1)28 go func() {29 time.Sleep(2 * time.Second)30 }()31 fmt.Println(testutils.Drain(ch))32}33import (34func main() {35 ch := make(chan string, 1)36 go func() {37 time.Sleep(2 * time.Second)38 }()39 fmt.Println(testutils.Drain(ch))40}

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {4 fmt.Fprint(w, "Hello, client")5 }))6 defer srv.Close()7 res, err := http.Get(srv.URL)8 if err != nil {9 log.Fatal(err)10 }11 fmt.Println(res)12 fmt.Println(res.Body)13 srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {14 fmt.Fprint(w, "Hello, client")15 }))16 defer srv1.Close()17 res, err = http.Get(srv1.URL)18 if err != nil {19 log.Fatal(err)20 }21 fmt.Println(res)22 fmt.Println(res.Body)23 srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {24 fmt.Fprint(w, "Hello, client")25 }))26 defer srv2.Close()27 res, err = http.Get(srv2.URL)28 if err != nil {29 log.Fatal(err)30 }31 fmt.Println(res)32 fmt.Println(res.Body)33 srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {34 fmt.Fprint(w, "Hello, client")35 }))36 defer srv3.Close()37 res, err = http.Get(srv3.URL)38 if err != nil {39 log.Fatal(err)40 }41 fmt.Println(res)42 fmt.Println(res.Body)43 srv4 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {44 fmt.Fprint(w, "Hello, client")45 }))

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 if err != nil {4 panic(err)5 }6 defer resp.Body.Close()7 fmt.Println(resp.Status)8}9body, err := ioutil.ReadAll(resp.Body)10if err != nil {11 panic(err)12}13fmt.Println(string(body))

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run K6 automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful