Best Gauge code snippet using execution.start
dbVisibilityPersistenceTest.go
Source:dbVisibilityPersistenceTest.go
...99 workflowExecution := types.WorkflowExecution{100 WorkflowID: "visibility-workflow-test",101 RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c536",102 }103 startTime := time.Now().Add(time.Second * -5).UnixNano()104 startReq := &p.RecordWorkflowExecutionStartedRequest{105 DomainUUID: testDomainUUID,106 Execution: workflowExecution,107 WorkflowTypeName: "visibility-workflow",108 StartTimestamp: startTime,109 }110 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, startReq)111 s.Nil(err0)112 resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{113 DomainUUID: testDomainUUID,114 PageSize: 1,115 EarliestTime: startTime,116 LatestTime: startTime,117 })118 s.Nil(err1)119 s.Equal(1, len(resp.Executions))120 s.assertOpenExecutionEquals(startReq, resp.Executions[0])121 closeReq := &p.RecordWorkflowExecutionClosedRequest{122 DomainUUID: testDomainUUID,123 Execution: workflowExecution,124 WorkflowTypeName: "visibility-workflow",125 StartTimestamp: startTime,126 CloseTimestamp: time.Now().UnixNano(),127 HistoryLength: 5,128 }129 err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)130 s.Nil(err2)131 resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{132 DomainUUID: testDomainUUID,133 PageSize: 1,134 EarliestTime: startTime,135 LatestTime: startTime,136 })137 s.Nil(err3)138 s.Equal(0, len(resp.Executions))139 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{140 DomainUUID: testDomainUUID,141 PageSize: 1,142 EarliestTime: startTime,143 LatestTime: startTime,144 })145 s.Nil(err4)146 s.Equal(1, len(resp.Executions))147 s.assertClosedExecutionEquals(closeReq, resp.Executions[0])148}149// TestCronVisibility test150func (s *DBVisibilityPersistenceSuite) TestCronVisibility() {151 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)152 defer cancel()153 testDomainUUID := uuid.New()154 workflowExecution := types.WorkflowExecution{155 WorkflowID: "visibility-cron-workflow-test",156 RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c537",157 }158 startTime := time.Now().Add(time.Second * -5).UnixNano()159 startReq := &p.RecordWorkflowExecutionStartedRequest{160 DomainUUID: testDomainUUID,161 Execution: workflowExecution,162 WorkflowTypeName: "visibility-cron-workflow",163 StartTimestamp: startTime,164 IsCron: true,165 }166 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, startReq)167 s.Nil(err0)168 resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{169 DomainUUID: testDomainUUID,170 PageSize: 1,171 EarliestTime: startTime,172 LatestTime: startTime,173 })174 s.Nil(err1)175 s.Equal(1, len(resp.Executions))176 s.True(resp.Executions[0].IsCron)177 closeReq := &p.RecordWorkflowExecutionClosedRequest{178 DomainUUID: testDomainUUID,179 Execution: workflowExecution,180 WorkflowTypeName: "visibility-workflow",181 StartTimestamp: startTime,182 CloseTimestamp: time.Now().UnixNano(),183 HistoryLength: 5,184 IsCron: true,185 }186 err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)187 s.Nil(err2)188 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{189 DomainUUID: testDomainUUID,190 PageSize: 1,191 EarliestTime: startTime,192 LatestTime: startTime,193 })194 s.Nil(err4)195 s.Equal(1, len(resp.Executions))196 s.True(resp.Executions[0].IsCron)197}198// TestBasicVisibilityTimeSkew test199func (s *DBVisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() {200 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)201 defer cancel()202 testDomainUUID := uuid.New()203 workflowExecution := types.WorkflowExecution{204 WorkflowID: "visibility-workflow-test-time-skew",205 RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c536",206 }207 startTime := time.Now().Add(time.Second * -5).UnixNano()208 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{209 DomainUUID: testDomainUUID,210 Execution: workflowExecution,211 WorkflowTypeName: "visibility-workflow",212 StartTimestamp: startTime,213 })214 s.Nil(err0)215 resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{216 DomainUUID: testDomainUUID,217 PageSize: 1,218 EarliestTime: startTime,219 LatestTime: startTime,220 })221 s.Nil(err1)222 s.Equal(1, len(resp.Executions))223 s.Equal(workflowExecution.WorkflowID, resp.Executions[0].Execution.WorkflowID)224 err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, &p.RecordWorkflowExecutionClosedRequest{225 DomainUUID: testDomainUUID,226 Execution: workflowExecution,227 WorkflowTypeName: "visibility-workflow",228 StartTimestamp: startTime,229 CloseTimestamp: startTime - (10 * time.Second).Nanoseconds(),230 })231 s.Nil(err2)232 resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{233 DomainUUID: testDomainUUID,234 PageSize: 1,235 EarliestTime: startTime,236 LatestTime: startTime,237 })238 s.Nil(err3)239 s.Equal(0, len(resp.Executions))240 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{241 DomainUUID: testDomainUUID,242 PageSize: 1,243 EarliestTime: startTime,244 LatestTime: startTime,245 })246 s.Nil(err4)247 s.Equal(1, len(resp.Executions))248}249// TestVisibilityPagination test250func (s *DBVisibilityPersistenceSuite) TestVisibilityPagination() {251 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)252 defer cancel()253 testDomainUUID := uuid.New()254 // Create 2 executions255 startTime1 := time.Now()256 workflowExecution1 := types.WorkflowExecution{257 WorkflowID: "visibility-pagination-test1",258 RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c536",259 }260 startReq1 := &p.RecordWorkflowExecutionStartedRequest{261 DomainUUID: testDomainUUID,262 Execution: workflowExecution1,263 WorkflowTypeName: "visibility-workflow",264 StartTimestamp: startTime1.UnixNano(),265 }266 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, startReq1)267 s.Nil(err0)268 startTime2 := startTime1.Add(time.Second)269 workflowExecution2 := types.WorkflowExecution{270 WorkflowID: "visibility-pagination-test2",271 RunID: "843f6fc7-102a-4c63-a2d4-7c653b01bf52",272 }273 startReq2 := &p.RecordWorkflowExecutionStartedRequest{274 DomainUUID: testDomainUUID,275 Execution: workflowExecution2,276 WorkflowTypeName: "visibility-workflow",277 StartTimestamp: startTime2.UnixNano(),278 }279 err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, startReq2)280 s.Nil(err1)281 // Get the first one282 resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{283 DomainUUID: testDomainUUID,284 PageSize: 1,285 EarliestTime: startTime1.UnixNano(),286 LatestTime: startTime2.UnixNano(),287 })288 s.Nil(err2)289 s.Equal(1, len(resp.Executions))290 s.assertOpenExecutionEquals(startReq2, resp.Executions[0])291 // Use token to get the second one292 resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{293 DomainUUID: testDomainUUID,294 PageSize: 1,295 EarliestTime: startTime1.UnixNano(),296 LatestTime: startTime2.UnixNano(),297 NextPageToken: resp.NextPageToken,298 })299 s.Nil(err3)300 s.Equal(1, len(resp.Executions))301 s.assertOpenExecutionEquals(startReq1, resp.Executions[0])302 // It is possible to not return non empty token which is going to return empty result303 if len(resp.NextPageToken) != 0 {304 // Now should get empty result by using token305 resp, err4 := s.VisibilityMgr.ListOpenWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{306 DomainUUID: testDomainUUID,307 PageSize: 1,308 EarliestTime: startTime1.UnixNano(),309 LatestTime: startTime2.UnixNano(),310 NextPageToken: resp.NextPageToken,311 })312 s.Nil(err4)313 s.Equal(0, len(resp.Executions))314 }315}316// TestFilteringByType test317func (s *DBVisibilityPersistenceSuite) TestFilteringByType() {318 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)319 defer cancel()320 testDomainUUID := uuid.New()321 startTime := time.Now().UnixNano()322 // Create 2 executions323 workflowExecution1 := types.WorkflowExecution{324 WorkflowID: "visibility-filtering-test1",325 RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c536",326 }327 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{328 DomainUUID: testDomainUUID,329 Execution: workflowExecution1,330 WorkflowTypeName: "visibility-workflow-1",331 StartTimestamp: startTime,332 })333 s.Nil(err0)334 workflowExecution2 := types.WorkflowExecution{335 WorkflowID: "visibility-filtering-test2",336 RunID: "843f6fc7-102a-4c63-a2d4-7c653b01bf52",337 }338 err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{339 DomainUUID: testDomainUUID,340 Execution: workflowExecution2,341 WorkflowTypeName: "visibility-workflow-2",342 StartTimestamp: startTime,343 })344 s.Nil(err1)345 // List open with filtering346 resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutionsByType(ctx, &p.ListWorkflowExecutionsByTypeRequest{347 ListWorkflowExecutionsRequest: p.ListWorkflowExecutionsRequest{348 DomainUUID: testDomainUUID,349 PageSize: 2,350 EarliestTime: startTime,351 LatestTime: startTime,352 },353 WorkflowTypeName: "visibility-workflow-1",354 })355 s.Nil(err2)356 s.Equal(1, len(resp.Executions))357 s.Equal(workflowExecution1.WorkflowID, resp.Executions[0].Execution.WorkflowID)358 // Close both executions359 err3 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, &p.RecordWorkflowExecutionClosedRequest{360 DomainUUID: testDomainUUID,361 Execution: workflowExecution1,362 WorkflowTypeName: "visibility-workflow-1",363 StartTimestamp: startTime,364 CloseTimestamp: time.Now().UnixNano(),365 })366 s.Nil(err3)367 closeReq := &p.RecordWorkflowExecutionClosedRequest{368 DomainUUID: testDomainUUID,369 Execution: workflowExecution2,370 WorkflowTypeName: "visibility-workflow-2",371 StartTimestamp: startTime,372 CloseTimestamp: time.Now().UnixNano(),373 HistoryLength: 3,374 }375 err4 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)376 s.Nil(err4)377 // List closed with filtering378 resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutionsByType(ctx, &p.ListWorkflowExecutionsByTypeRequest{379 ListWorkflowExecutionsRequest: p.ListWorkflowExecutionsRequest{380 DomainUUID: testDomainUUID,381 PageSize: 2,382 EarliestTime: startTime,383 LatestTime: startTime,384 },385 WorkflowTypeName: "visibility-workflow-2",386 })387 s.Nil(err5)388 s.Equal(1, len(resp.Executions))389 s.assertClosedExecutionEquals(closeReq, resp.Executions[0])390}391// TestFilteringByWorkflowID test392func (s *DBVisibilityPersistenceSuite) TestFilteringByWorkflowID() {393 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)394 defer cancel()395 testDomainUUID := uuid.New()396 startTime := time.Now().UnixNano()397 // Create 2 executions398 workflowExecution1 := types.WorkflowExecution{399 WorkflowID: "visibility-filtering-test1",400 RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c536",401 }402 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{403 DomainUUID: testDomainUUID,404 Execution: workflowExecution1,405 WorkflowTypeName: "visibility-workflow",406 StartTimestamp: startTime,407 })408 s.Nil(err0)409 workflowExecution2 := types.WorkflowExecution{410 WorkflowID: "visibility-filtering-test2",411 RunID: "843f6fc7-102a-4c63-a2d4-7c653b01bf52",412 }413 err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{414 DomainUUID: testDomainUUID,415 Execution: workflowExecution2,416 WorkflowTypeName: "visibility-workflow",417 StartTimestamp: startTime,418 })419 s.Nil(err1)420 // List open with filtering421 resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutionsByWorkflowID(ctx, &p.ListWorkflowExecutionsByWorkflowIDRequest{422 ListWorkflowExecutionsRequest: p.ListWorkflowExecutionsRequest{423 DomainUUID: testDomainUUID,424 PageSize: 2,425 EarliestTime: startTime,426 LatestTime: startTime,427 },428 WorkflowID: "visibility-filtering-test1",429 })430 s.Nil(err2)431 s.Equal(1, len(resp.Executions))432 s.Equal(workflowExecution1.WorkflowID, resp.Executions[0].Execution.WorkflowID)433 // Close both executions434 err3 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, &p.RecordWorkflowExecutionClosedRequest{435 DomainUUID: testDomainUUID,436 Execution: workflowExecution1,437 WorkflowTypeName: "visibility-workflow",438 StartTimestamp: startTime,439 CloseTimestamp: time.Now().UnixNano(),440 })441 s.Nil(err3)442 closeReq := &p.RecordWorkflowExecutionClosedRequest{443 DomainUUID: testDomainUUID,444 Execution: workflowExecution2,445 WorkflowTypeName: "visibility-workflow",446 StartTimestamp: startTime,447 CloseTimestamp: time.Now().UnixNano(),448 HistoryLength: 3,449 }450 err4 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)451 s.Nil(err4)452 // List closed with filtering453 resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutionsByWorkflowID(ctx, &p.ListWorkflowExecutionsByWorkflowIDRequest{454 ListWorkflowExecutionsRequest: p.ListWorkflowExecutionsRequest{455 DomainUUID: testDomainUUID,456 PageSize: 2,457 EarliestTime: startTime,458 LatestTime: startTime,459 },460 WorkflowID: "visibility-filtering-test2",461 })462 s.Nil(err5)463 s.Equal(1, len(resp.Executions))464 s.assertClosedExecutionEquals(closeReq, resp.Executions[0])465}466// TestFilteringByCloseStatus test467func (s *DBVisibilityPersistenceSuite) TestFilteringByCloseStatus() {468 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)469 defer cancel()470 testDomainUUID := uuid.New()471 startTime := time.Now().UnixNano()472 // Create 2 executions473 workflowExecution1 := types.WorkflowExecution{474 WorkflowID: "visibility-filtering-test1",475 RunID: "fb15e4b5-356f-466d-8c6d-a29223e5c536",476 }477 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{478 DomainUUID: testDomainUUID,479 Execution: workflowExecution1,480 WorkflowTypeName: "visibility-workflow",481 StartTimestamp: startTime,482 })483 s.Nil(err0)484 workflowExecution2 := types.WorkflowExecution{485 WorkflowID: "visibility-filtering-test2",486 RunID: "843f6fc7-102a-4c63-a2d4-7c653b01bf52",487 }488 err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{489 DomainUUID: testDomainUUID,490 Execution: workflowExecution2,491 WorkflowTypeName: "visibility-workflow",492 StartTimestamp: startTime,493 })494 s.Nil(err1)495 // Close both executions with different status496 err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, &p.RecordWorkflowExecutionClosedRequest{497 DomainUUID: testDomainUUID,498 Execution: workflowExecution1,499 WorkflowTypeName: "visibility-workflow",500 StartTimestamp: startTime,501 CloseTimestamp: time.Now().UnixNano(),502 Status: types.WorkflowExecutionCloseStatusCompleted,503 })504 s.Nil(err2)505 closeReq := &p.RecordWorkflowExecutionClosedRequest{506 DomainUUID: testDomainUUID,507 Execution: workflowExecution2,508 WorkflowTypeName: "visibility-workflow",509 StartTimestamp: startTime,510 Status: types.WorkflowExecutionCloseStatusFailed,511 CloseTimestamp: time.Now().UnixNano(),512 HistoryLength: 3,513 }514 err3 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)515 s.Nil(err3)516 // List closed with filtering517 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutionsByStatus(ctx, &p.ListClosedWorkflowExecutionsByStatusRequest{518 ListWorkflowExecutionsRequest: p.ListWorkflowExecutionsRequest{519 DomainUUID: testDomainUUID,520 PageSize: 2,521 EarliestTime: startTime,522 LatestTime: startTime,523 },524 Status: types.WorkflowExecutionCloseStatusFailed,525 })526 s.Nil(err4)527 s.Equal(1, len(resp.Executions))528 s.assertClosedExecutionEquals(closeReq, resp.Executions[0])529}530// TestGetClosedExecution test531func (s *DBVisibilityPersistenceSuite) TestGetClosedExecution() {532 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)533 defer cancel()534 testDomainUUID := uuid.New()535 workflowExecution := types.WorkflowExecution{536 WorkflowID: "visibility-workflow-test",537 RunID: "a3dbc7bf-deb1-4946-b57c-cf0615ea553f",538 }539 startTime := time.Now().Add(time.Second * -5).UnixNano()540 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{541 DomainUUID: testDomainUUID,542 Execution: workflowExecution,543 WorkflowTypeName: "visibility-workflow",544 StartTimestamp: startTime,545 })546 s.Nil(err0)547 closedResp, err1 := s.VisibilityMgr.GetClosedWorkflowExecution(ctx, &p.GetClosedWorkflowExecutionRequest{548 DomainUUID: testDomainUUID,549 Execution: workflowExecution,550 })551 s.Error(err1)552 _, ok := err1.(*types.EntityNotExistsError)553 s.True(ok, "EntityNotExistsError")554 s.Nil(closedResp)555 closeReq := &p.RecordWorkflowExecutionClosedRequest{556 DomainUUID: testDomainUUID,557 Execution: workflowExecution,558 WorkflowTypeName: "visibility-workflow",559 StartTimestamp: startTime,560 Status: types.WorkflowExecutionCloseStatusFailed,561 CloseTimestamp: time.Now().UnixNano(),562 HistoryLength: 3,563 }564 err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)565 s.Nil(err2)566 resp, err3 := s.VisibilityMgr.GetClosedWorkflowExecution(ctx, &p.GetClosedWorkflowExecutionRequest{567 DomainUUID: testDomainUUID,568 Execution: workflowExecution,569 })570 s.Nil(err3)571 s.assertClosedExecutionEquals(closeReq, resp.Execution)572}573// TestClosedWithoutStarted test574func (s *DBVisibilityPersistenceSuite) TestClosedWithoutStarted() {575 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)576 defer cancel()577 testDomainUUID := uuid.New()578 workflowExecution := types.WorkflowExecution{579 WorkflowID: "visibility-workflow-test",580 RunID: "1bdb0122-e8c9-4b35-b6f8-d692ab259b09",581 }582 closedResp, err0 := s.VisibilityMgr.GetClosedWorkflowExecution(ctx, &p.GetClosedWorkflowExecutionRequest{583 DomainUUID: testDomainUUID,584 Execution: workflowExecution,585 })586 s.Error(err0)587 _, ok := err0.(*types.EntityNotExistsError)588 s.True(ok, "EntityNotExistsError")589 s.Nil(closedResp)590 closeReq := &p.RecordWorkflowExecutionClosedRequest{591 DomainUUID: testDomainUUID,592 Execution: workflowExecution,593 WorkflowTypeName: "visibility-workflow",594 StartTimestamp: time.Now().Add(time.Second * -5).UnixNano(),595 Status: types.WorkflowExecutionCloseStatusFailed,596 CloseTimestamp: time.Now().UnixNano(),597 HistoryLength: 3,598 }599 err1 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)600 s.Nil(err1)601 resp, err2 := s.VisibilityMgr.GetClosedWorkflowExecution(ctx, &p.GetClosedWorkflowExecutionRequest{602 DomainUUID: testDomainUUID,603 Execution: workflowExecution,604 })605 s.Nil(err2)606 s.assertClosedExecutionEquals(closeReq, resp.Execution)607}608// TestMultipleUpserts test609func (s *DBVisibilityPersistenceSuite) TestMultipleUpserts() {610 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)611 defer cancel()612 testDomainUUID := uuid.New()613 workflowExecution := types.WorkflowExecution{614 WorkflowID: "visibility-workflow-test",615 RunID: "a3dbc7bf-deb1-4946-b57c-cf0615ea553f",616 }617 startTime := time.Now().Add(time.Second * -5).UnixNano()618 closeReq := &p.RecordWorkflowExecutionClosedRequest{619 DomainUUID: testDomainUUID,620 Execution: workflowExecution,621 WorkflowTypeName: "visibility-workflow",622 StartTimestamp: startTime,623 Status: types.WorkflowExecutionCloseStatusFailed,624 CloseTimestamp: time.Now().UnixNano(),625 HistoryLength: 3,626 }627 count := 3628 for i := 0; i < count; i++ {629 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{630 DomainUUID: testDomainUUID,631 Execution: workflowExecution,632 WorkflowTypeName: "visibility-workflow",633 StartTimestamp: startTime,634 })635 s.Nil(err0)636 if i < count-1 {637 err1 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)638 s.Nil(err1)639 }640 }641 resp, err3 := s.VisibilityMgr.GetClosedWorkflowExecution(ctx, &p.GetClosedWorkflowExecutionRequest{642 DomainUUID: testDomainUUID,643 Execution: workflowExecution,644 })645 s.Nil(err3)646 s.assertClosedExecutionEquals(closeReq, resp.Execution)647}648// TestDelete test649func (s *DBVisibilityPersistenceSuite) TestDelete() {650 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)651 defer cancel()652 if s.VisibilityMgr.GetName() == "cassandra" {653 // this test is not applicable for cassandra654 return655 }656 nRows := 5657 testDomainUUID := uuid.New()658 startTime := time.Now().Add(time.Second * -5).UnixNano()659 for i := 0; i < nRows; i++ {660 workflowExecution := types.WorkflowExecution{661 WorkflowID: uuid.New(),662 RunID: uuid.New(),663 }664 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, &p.RecordWorkflowExecutionStartedRequest{665 DomainUUID: testDomainUUID,666 Execution: workflowExecution,667 WorkflowTypeName: "visibility-workflow",668 StartTimestamp: startTime,669 })670 s.Nil(err0)671 closeReq := &p.RecordWorkflowExecutionClosedRequest{672 DomainUUID: testDomainUUID,673 Execution: workflowExecution,674 WorkflowTypeName: "visibility-workflow",675 StartTimestamp: startTime,676 Status: types.WorkflowExecutionCloseStatusFailed,677 CloseTimestamp: time.Now().UnixNano(),678 HistoryLength: 3,679 }680 err1 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)681 s.Nil(err1)682 }683 resp, err3 := s.VisibilityMgr.ListClosedWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{684 DomainUUID: testDomainUUID,685 EarliestTime: startTime,686 LatestTime: time.Now().UnixNano(),687 PageSize: 10,688 })689 s.Nil(err3)690 s.Equal(nRows, len(resp.Executions))691 remaining := nRows692 for _, row := range resp.Executions {693 err4 := s.VisibilityMgr.DeleteWorkflowExecution(ctx, &p.VisibilityDeleteWorkflowExecutionRequest{694 DomainID: testDomainUUID,695 RunID: row.GetExecution().GetRunID(),696 })697 s.Nil(err4)698 remaining--699 resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(ctx, &p.ListWorkflowExecutionsRequest{700 DomainUUID: testDomainUUID,701 EarliestTime: startTime,702 LatestTime: time.Now().UnixNano(),703 PageSize: 10,704 })705 s.Nil(err5)706 s.Equal(remaining, len(resp.Executions))707 }708}709// TestUpsertWorkflowExecution test710func (s *DBVisibilityPersistenceSuite) TestUpsertWorkflowExecution() {711 ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)712 defer cancel()713 tests := []struct {714 request *p.UpsertWorkflowExecutionRequest715 expected error...
visibility_persistence_suite_test.go
Source:visibility_persistence_suite_test.go
...89}90// TestBasicVisibility test91func (s *VisibilityPersistenceSuite) TestBasicVisibility() {92 testNamespaceUUID := namespace.ID(uuid.New())93 startTime := time.Now().UTC().Add(time.Second * -5)94 startReq := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-workflow-test", "visibility-workflow", startTime, "test-queue")95 resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{96 NamespaceID: testNamespaceUUID,97 PageSize: 1,98 EarliestStartTime: startTime,99 LatestStartTime: startTime,100 })101 s.Nil(err1)102 s.Equal(1, len(resp.Executions))103 s.assertOpenExecutionEquals(startReq, resp.Executions[0])104 closeReq := s.createClosedWorkflowRecord(startReq, time.Now())105 resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{106 NamespaceID: testNamespaceUUID,107 PageSize: 1,108 EarliestStartTime: startTime,109 LatestStartTime: startTime,110 })111 s.Nil(err3)112 s.Equal(0, len(resp.Executions))113 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{114 NamespaceID: testNamespaceUUID,115 PageSize: 1,116 EarliestStartTime: startTime,117 LatestStartTime: time.Now(),118 })119 s.Nil(err4)120 s.Equal(1, len(resp.Executions))121 s.assertClosedExecutionEquals(closeReq, resp.Executions[0])122}123// TestBasicVisibilityTimeSkew test124func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() {125 testNamespaceUUID := namespace.ID(uuid.New())126 startTime := time.Now()127 openRecord := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-workflow-test-time-skew", "visibility-workflow", startTime, "test-queue")128 resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{129 NamespaceID: testNamespaceUUID,130 PageSize: 1,131 EarliestStartTime: startTime,132 LatestStartTime: startTime,133 })134 s.NoError(err1)135 s.Equal(1, len(resp.Executions))136 s.assertOpenExecutionEquals(openRecord, resp.Executions[0])137 closedRecord := s.createClosedWorkflowRecord(openRecord, startTime.Add(-10*time.Millisecond))138 resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{139 NamespaceID: testNamespaceUUID,140 PageSize: 1,141 EarliestStartTime: startTime,142 LatestStartTime: startTime,143 })144 s.NoError(err3)145 s.Equal(0, len(resp.Executions))146 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{147 NamespaceID: testNamespaceUUID,148 PageSize: 1,149 EarliestStartTime: startTime.Add(-10 * time.Millisecond), // This is actually close_time150 LatestStartTime: startTime.Add(-10 * time.Millisecond),151 })152 s.NoError(err4)153 s.Equal(1, len(resp.Executions))154 s.assertClosedExecutionEquals(closedRecord, resp.Executions[0])155}156func (s *VisibilityPersistenceSuite) TestBasicVisibilityShortWorkflow() {157 testNamespaceUUID := namespace.ID(uuid.New())158 startTime := time.Now().UTC()159 openRecord := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-workflow-test-short-workflow", "visibility-workflow", startTime, "test-queue")160 closedRecord := s.createClosedWorkflowRecord(openRecord, startTime.Add(10*time.Millisecond))161 resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{162 NamespaceID: testNamespaceUUID,163 PageSize: 1,164 EarliestStartTime: startTime,165 LatestStartTime: startTime,166 })167 s.NoError(err3)168 s.Equal(0, len(resp.Executions))169 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{170 NamespaceID: testNamespaceUUID,171 PageSize: 1,172 EarliestStartTime: startTime.Add(10 * time.Millisecond), // This is actually close_time173 LatestStartTime: startTime.Add(10 * time.Millisecond),174 })175 s.NoError(err4)176 s.Equal(1, len(resp.Executions))177 s.assertClosedExecutionEquals(closedRecord, resp.Executions[0])178}179// TestVisibilityPagination test180func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {181 testNamespaceUUID := namespace.ID(uuid.New())182 // Create 2 executions183 startTime1 := time.Now().UTC()184 openRecord1 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-pagination-test1", "visibility-workflow", startTime1, "test-queue")185 startTime2 := startTime1.Add(time.Second)186 openRecord2 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-pagination-test2", "visibility-workflow", startTime2, "test-queue")187 // Get the first one188 resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{189 NamespaceID: testNamespaceUUID,190 PageSize: 1,191 EarliestStartTime: startTime1,192 LatestStartTime: startTime2,193 })194 s.Nil(err2)195 s.Equal(1, len(resp.Executions))196 s.assertOpenExecutionEquals(openRecord2, resp.Executions[0])197 // Use token to get the second one198 resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{199 NamespaceID: testNamespaceUUID,200 PageSize: 1,201 EarliestStartTime: startTime1,202 LatestStartTime: startTime2,203 NextPageToken: resp.NextPageToken,204 })205 s.Nil(err3)206 s.Equal(1, len(resp.Executions))207 s.assertOpenExecutionEquals(openRecord1, resp.Executions[0])208 // It is possible to not return non empty token which is going to return empty result209 if len(resp.NextPageToken) != 0 {210 // Now should get empty result by using token211 resp, err4 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{212 NamespaceID: testNamespaceUUID,213 PageSize: 1,214 EarliestStartTime: startTime1,215 LatestStartTime: startTime2,216 NextPageToken: resp.NextPageToken,217 })218 s.Nil(err4)219 s.Equal(0, len(resp.Executions))220 }221}222// TestFilteringByStartTime test223func (s *VisibilityPersistenceSuite) TestFilteringByStartTime() {224 testNamespaceUUID := namespace.ID(uuid.New())225 startTime := time.Now()226 // Create 2 open workflows, one started 2hrs ago, the other started just now.227 openRecord1 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-filtering-test1", "visibility-workflow-1", startTime.Add(-2*time.Hour), "test-queue")228 openRecord2 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-filtering-test2", "visibility-workflow-2", startTime, "test-queue")229 // List open workflows with start time filter230 resp, err := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{231 NamespaceID: testNamespaceUUID,232 PageSize: 2,233 EarliestStartTime: time.Now().Add(-time.Hour),234 LatestStartTime: time.Now(),235 })236 s.NoError(err)237 s.Equal(1, len(resp.Executions))238 s.assertOpenExecutionEquals(openRecord2, resp.Executions[0])239 // List with WorkflowType filter in query string240 queryStr := fmt.Sprintf(`StartTime BETWEEN "%v" AND "%v"`, time.Now().Add(-time.Hour).Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano))241 resp, err = s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{242 NamespaceID: testNamespaceUUID,243 PageSize: 2,244 Query: queryStr,245 })246 s.Nil(err)247 s.Equal(1, len(resp.Executions))248 s.assertOpenExecutionEquals(openRecord2, resp.Executions[0])249 queryStr = fmt.Sprintf(`StartTime BETWEEN "%v" AND "%v"`, time.Now().Add(-3*time.Hour).Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano))250 resp, err = s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{251 NamespaceID: testNamespaceUUID,252 PageSize: 2,253 Query: queryStr,254 })255 s.Nil(err)256 s.Equal(2, len(resp.Executions))257 resp, err = s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{258 NamespaceID: testNamespaceUUID,259 PageSize: 2,260 Query: queryStr + ` AND WorkflowType = "visibility-workflow-1"`,261 })262 s.Nil(err)263 s.Equal(1, len(resp.Executions))264 s.assertOpenExecutionEquals(openRecord1, resp.Executions[0])265}266// TestFilteringByType test267func (s *VisibilityPersistenceSuite) TestFilteringByType() {268 testNamespaceUUID := namespace.ID(uuid.New())269 startTime := time.Now()270 // Create 2 executions271 openRecord1 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-filtering-test1", "visibility-workflow-1", startTime, "test-queue")272 openRecord2 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-filtering-test2", "visibility-workflow-2", startTime, "test-queue")273 // List open with filtering274 resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutionsByType(s.ctx, &manager.ListWorkflowExecutionsByTypeRequest{275 ListWorkflowExecutionsRequest: &manager.ListWorkflowExecutionsRequest{276 NamespaceID: testNamespaceUUID,277 PageSize: 2,278 EarliestStartTime: startTime,279 LatestStartTime: startTime,280 },281 WorkflowTypeName: "visibility-workflow-1",282 })283 s.Nil(err2)284 s.Equal(1, len(resp.Executions))285 s.assertOpenExecutionEquals(openRecord1, resp.Executions[0])286 // List with WorkflowType filter in query string287 resp, err := s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{288 NamespaceID: testNamespaceUUID,289 PageSize: 2,290 Query: fmt.Sprintf(`WorkflowType = "visibility-workflow-1"`),291 })292 s.Nil(err)293 s.Equal(1, len(resp.Executions))294 s.assertOpenExecutionEquals(openRecord1, resp.Executions[0])295 // Close both executions296 s.createClosedWorkflowRecord(openRecord1, time.Now())297 closedRecord2 := s.createClosedWorkflowRecord(openRecord2, time.Now())298 // List closed with filtering299 resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutionsByType(s.ctx, &manager.ListWorkflowExecutionsByTypeRequest{300 ListWorkflowExecutionsRequest: &manager.ListWorkflowExecutionsRequest{301 NamespaceID: testNamespaceUUID,302 PageSize: 2,303 EarliestStartTime: startTime,304 LatestStartTime: time.Now(),305 },306 WorkflowTypeName: "visibility-workflow-2",307 })308 s.Nil(err5)309 s.Equal(1, len(resp.Executions))310 s.assertClosedExecutionEquals(closedRecord2, resp.Executions[0])311 // List with WorkflowType filter in query string312 resp, err = s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{313 NamespaceID: testNamespaceUUID,314 PageSize: 2,315 Query: fmt.Sprintf(`WorkflowType = "visibility-workflow-2"`),316 })317 s.Nil(err)318 s.Equal(1, len(resp.Executions))319 s.assertClosedExecutionEquals(closedRecord2, resp.Executions[0])320}321// TestFilteringByWorkflowID test322func (s *VisibilityPersistenceSuite) TestFilteringByWorkflowID() {323 testNamespaceUUID := namespace.ID(uuid.New())324 startTime := time.Now()325 // Create 2 executions326 openRecord1 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-filtering-test1", "visibility-workflow", startTime, "test-queue")327 openRecord2 := s.createOpenWorkflowRecord(testNamespaceUUID, "visibility-filtering-test2", "visibility-workflow", startTime, "test-queue")328 // List open with filtering329 resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutionsByWorkflowID(s.ctx, &manager.ListWorkflowExecutionsByWorkflowIDRequest{330 ListWorkflowExecutionsRequest: &manager.ListWorkflowExecutionsRequest{331 NamespaceID: testNamespaceUUID,332 PageSize: 2,333 EarliestStartTime: startTime,334 LatestStartTime: startTime,335 },336 WorkflowID: "visibility-filtering-test1",337 })338 s.Nil(err2)339 s.Equal(1, len(resp.Executions))340 s.assertOpenExecutionEquals(openRecord1, resp.Executions[0])341 // List workflow with workflowID filter in query string342 resp, err := s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{343 NamespaceID: testNamespaceUUID,344 PageSize: 2,345 Query: fmt.Sprintf(`WorkflowId = "visibility-filtering-test1"`),346 })347 s.Nil(err)348 s.Equal(1, len(resp.Executions))349 s.assertOpenExecutionEquals(openRecord1, resp.Executions[0])350 // Close both executions351 s.createClosedWorkflowRecord(openRecord1, time.Now())352 closedRecord2 := s.createClosedWorkflowRecord(openRecord2, time.Now())353 // List closed with filtering354 resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutionsByWorkflowID(s.ctx, &manager.ListWorkflowExecutionsByWorkflowIDRequest{355 ListWorkflowExecutionsRequest: &manager.ListWorkflowExecutionsRequest{356 NamespaceID: testNamespaceUUID,357 PageSize: 2,358 EarliestStartTime: startTime,359 LatestStartTime: time.Now(),360 },361 WorkflowID: "visibility-filtering-test2",362 })363 s.Nil(err5)364 s.Equal(1, len(resp.Executions))365 s.assertClosedExecutionEquals(closedRecord2, resp.Executions[0])366 // List workflow with workflowID filter in query string367 resp, err = s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{368 NamespaceID: testNamespaceUUID,369 PageSize: 2,370 Query: fmt.Sprintf(`WorkflowId = "visibility-filtering-test2"`),371 })372 s.Nil(err)373 s.Equal(1, len(resp.Executions))374 s.assertClosedExecutionEquals(closedRecord2, resp.Executions[0])375}376// TestFilteringByStatus test377func (s *VisibilityPersistenceSuite) TestFilteringByStatus() {378 testNamespaceUUID := namespace.ID(uuid.New())379 startTime := time.Now()380 // Create 2 executions381 workflowExecution1 := commonpb.WorkflowExecution{382 WorkflowId: "visibility-filtering-test1",383 RunId: "fb15e4b5-356f-466d-8c6d-a29223e5c536",384 }385 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(s.ctx, &manager.RecordWorkflowExecutionStartedRequest{386 VisibilityRequestBase: &manager.VisibilityRequestBase{387 NamespaceID: testNamespaceUUID,388 Execution: workflowExecution1,389 WorkflowTypeName: "visibility-workflow",390 StartTime: startTime,391 },392 })393 s.Nil(err0)394 workflowExecution2 := commonpb.WorkflowExecution{395 WorkflowId: "visibility-filtering-test2",396 RunId: "843f6fc7-102a-4c63-a2d4-7c653b01bf52",397 }398 err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(s.ctx, &manager.RecordWorkflowExecutionStartedRequest{399 VisibilityRequestBase: &manager.VisibilityRequestBase{400 NamespaceID: testNamespaceUUID,401 Execution: workflowExecution2,402 WorkflowTypeName: "visibility-workflow",403 StartTime: startTime,404 },405 })406 s.Nil(err1)407 // Close both executions with different status408 err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(s.ctx, &manager.RecordWorkflowExecutionClosedRequest{409 VisibilityRequestBase: &manager.VisibilityRequestBase{410 NamespaceID: testNamespaceUUID,411 Execution: workflowExecution1,412 WorkflowTypeName: "visibility-workflow",413 StartTime: startTime,414 Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,415 },416 CloseTime: time.Now(),417 })418 s.Nil(err2)419 closeReq := &manager.RecordWorkflowExecutionClosedRequest{420 VisibilityRequestBase: &manager.VisibilityRequestBase{421 NamespaceID: testNamespaceUUID,422 Execution: workflowExecution2,423 WorkflowTypeName: "visibility-workflow",424 StartTime: startTime,425 Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,426 },427 CloseTime: time.Now(),428 HistoryLength: 3,429 }430 err3 := s.VisibilityMgr.RecordWorkflowExecutionClosed(s.ctx, closeReq)431 s.Nil(err3)432 // List closed with filtering433 resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutionsByStatus(s.ctx, &manager.ListClosedWorkflowExecutionsByStatusRequest{434 ListWorkflowExecutionsRequest: &manager.ListWorkflowExecutionsRequest{435 NamespaceID: testNamespaceUUID,436 PageSize: 2,437 EarliestStartTime: startTime,438 LatestStartTime: time.Now(),439 },440 Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,441 })442 s.Nil(err4)443 s.Equal(1, len(resp.Executions))444 s.assertClosedExecutionEquals(closeReq, resp.Executions[0])445 resp, err := s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{446 NamespaceID: testNamespaceUUID,447 PageSize: 5,448 Query: `ExecutionStatus = "Failed"`,449 })450 s.Nil(err)451 s.Equal(1, len(resp.Executions))452 s.assertClosedExecutionEquals(closeReq, resp.Executions[0])453}454// TestDelete test455func (s *VisibilityPersistenceSuite) TestDeleteWorkflow() {456 openRows := 10457 closedRows := 5458 testNamespaceUUID := namespace.ID(uuid.New())459 closeTime := time.Now().UTC()460 startTime := closeTime.Add(-5 * time.Second)461 var pendingExecutions []commonpb.WorkflowExecution462 for i := 0; i < openRows; i++ {463 workflowExecution := commonpb.WorkflowExecution{464 WorkflowId: uuid.New(),465 RunId: uuid.New(),466 }467 pendingExecutions = append(pendingExecutions, workflowExecution)468 err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(s.ctx, &manager.RecordWorkflowExecutionStartedRequest{469 VisibilityRequestBase: &manager.VisibilityRequestBase{470 NamespaceID: testNamespaceUUID,471 Execution: workflowExecution,472 WorkflowTypeName: "visibility-workflow",473 StartTime: startTime,474 },475 })476 s.Nil(err0)477 }478 for i := 0; i < closedRows; i++ {479 closeReq := &manager.RecordWorkflowExecutionClosedRequest{480 VisibilityRequestBase: &manager.VisibilityRequestBase{481 NamespaceID: testNamespaceUUID,482 Execution: pendingExecutions[i],483 WorkflowTypeName: "visibility-workflow",484 StartTime: startTime,485 Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,486 },487 CloseTime: closeTime,488 HistoryLength: 3,489 }490 err1 := s.VisibilityMgr.RecordWorkflowExecutionClosed(s.ctx, closeReq)491 s.Nil(err1)492 }493 resp, err3 := s.VisibilityMgr.ListClosedWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{494 NamespaceID: testNamespaceUUID,495 EarliestStartTime: startTime,496 LatestStartTime: closeTime,497 PageSize: 10,498 })499 s.Nil(err3)500 s.Equal(closedRows, len(resp.Executions))501 // Delete closed workflow502 for _, row := range resp.Executions {503 err4 := s.VisibilityMgr.DeleteWorkflowExecution(s.ctx, &manager.VisibilityDeleteWorkflowExecutionRequest{504 NamespaceID: testNamespaceUUID,505 WorkflowID: row.GetExecution().GetWorkflowId(),506 RunID: row.GetExecution().GetRunId(),507 CloseTime: &closeTime,508 })509 s.Nil(err4)510 }511 resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{512 NamespaceID: testNamespaceUUID,513 EarliestStartTime: startTime,514 LatestStartTime: closeTime,515 PageSize: 10,516 })517 s.Nil(err5)518 s.Equal(0, len(resp.Executions))519 resp, err6 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{520 NamespaceID: testNamespaceUUID,521 EarliestStartTime: startTime,522 LatestStartTime: closeTime,523 PageSize: 10,524 })525 s.Nil(err6)526 s.Equal(openRows-closedRows, len(resp.Executions))527 // Delete open workflow528 for _, row := range resp.Executions {529 err7 := s.VisibilityMgr.DeleteWorkflowExecution(s.ctx, &manager.VisibilityDeleteWorkflowExecutionRequest{530 NamespaceID: testNamespaceUUID,531 WorkflowID: row.GetExecution().GetWorkflowId(),532 RunID: row.GetExecution().GetRunId(),533 StartTime: &startTime,534 })535 s.Nil(err7)536 }537 resp, err8 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{538 NamespaceID: testNamespaceUUID,539 EarliestStartTime: startTime,540 LatestStartTime: closeTime,541 PageSize: 10,542 })543 s.Nil(err8)544 s.Equal(0, len(resp.Executions))545}546// TestUpsertWorkflowExecution test547func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {548 tests := []struct {549 request *manager.UpsertWorkflowExecutionRequest550 expected error551 }{552 {553 request: &manager.UpsertWorkflowExecutionRequest{554 VisibilityRequestBase: &manager.VisibilityRequestBase{555 NamespaceID: "",556 Namespace: "",557 Execution: commonpb.WorkflowExecution{},558 WorkflowTypeName: "",559 StartTime: time.Time{},560 ExecutionTime: time.Time{},561 TaskID: 0,562 Memo: nil,563 SearchAttributes: &commonpb.SearchAttributes{564 IndexedFields: map[string]*commonpb.Payload{565 searchattribute.TemporalChangeVersion: payload.EncodeBytes([]byte("dummy")),566 },567 },568 Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,569 },570 },571 expected: nil,572 },573 {574 request: &manager.UpsertWorkflowExecutionRequest{575 VisibilityRequestBase: &manager.VisibilityRequestBase{576 NamespaceID: "",577 Namespace: "",578 Execution: commonpb.WorkflowExecution{},579 WorkflowTypeName: "",580 StartTime: time.Time{},581 ExecutionTime: time.Time{},582 TaskID: 0,583 Memo: nil,584 SearchAttributes: nil,585 Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,586 },587 },588 // To avoid blocking the task queue processors on non-ElasticSearch visibility stores589 // we simply treat any attempts to perform Upserts as "no-ops"590 // Attempts to Scan, Count or List will still fail for non-ES stores.591 expected: nil,592 },593 }594 for _, test := range tests {595 s.Equal(test.expected, s.VisibilityMgr.UpsertWorkflowExecution(s.ctx, test.request))596 }597}598// TestAdvancedVisibilityPagination test599func (s *VisibilityPersistenceSuite) TestAdvancedVisibilityPagination() {600 testNamespaceUUID := namespace.ID(uuid.New())601 // Generate 5 workflow records, keep 2 open and 3 closed.602 var startReqs []*manager.RecordWorkflowExecutionStartedRequest603 var closeReqs []*manager.RecordWorkflowExecutionClosedRequest604 for i := 0; i < 5; i++ {605 startReq := s.createOpenWorkflowRecord(testNamespaceUUID, fmt.Sprintf("advanced-visibility-%v", i), "visibility-workflow", time.Now(), "test-queue")606 if i <= 1 {607 startReqs = append([]*manager.RecordWorkflowExecutionStartedRequest{startReq}, startReqs...)608 } else {609 closeReq := s.createClosedWorkflowRecord(startReq, time.Now())610 closeReqs = append([]*manager.RecordWorkflowExecutionClosedRequest{closeReq}, closeReqs...)611 }612 }613 for pageSize := 1; pageSize <= 5; pageSize++ {614 executions := s.listWithPagination(testNamespaceUUID, 5)615 s.Equal(5, len(executions))616 for i := 0; i < 5; i++ {617 if i <= 1 {618 s.assertOpenExecutionEquals(startReqs[i], executions[i])619 } else {620 s.assertClosedExecutionEquals(closeReqs[i-2], executions[i])621 }622 }623 }624}625func (s *VisibilityPersistenceSuite) listWithPagination(namespaceID namespace.ID, pageSize int) []*workflowpb.WorkflowExecutionInfo {626 var executions []*workflowpb.WorkflowExecutionInfo627 resp, err := s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{628 NamespaceID: namespaceID,629 PageSize: pageSize,630 Query: "",631 })632 s.Nil(err)633 executions = append(executions, resp.Executions...)634 for len(resp.NextPageToken) > 0 {635 resp, err = s.VisibilityMgr.ListWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequestV2{636 NamespaceID: namespaceID,637 PageSize: pageSize,638 Query: "",639 NextPageToken: resp.NextPageToken,640 })641 s.Nil(err)642 executions = append(executions, resp.Executions...)643 }644 return executions645}646func (s *VisibilityPersistenceSuite) createClosedWorkflowRecord(647 startReq *manager.RecordWorkflowExecutionStartedRequest,648 closeTime time.Time,649) *manager.RecordWorkflowExecutionClosedRequest {650 closeReq := &manager.RecordWorkflowExecutionClosedRequest{651 VisibilityRequestBase: &manager.VisibilityRequestBase{652 NamespaceID: startReq.NamespaceID,653 Execution: startReq.Execution,654 WorkflowTypeName: startReq.WorkflowTypeName,655 StartTime: startReq.StartTime,656 },657 CloseTime: closeTime,658 HistoryLength: 5,659 }660 err := s.VisibilityMgr.RecordWorkflowExecutionClosed(s.ctx, closeReq)661 s.Nil(err)662 return closeReq663}664func (s *VisibilityPersistenceSuite) createOpenWorkflowRecord(665 namespaceID namespace.ID,666 workflowID string,667 workflowType string,668 startTime time.Time,669 taskQueue string,670) *manager.RecordWorkflowExecutionStartedRequest {671 workflowExecution := commonpb.WorkflowExecution{672 WorkflowId: workflowID,673 RunId: uuid.New(),674 }675 startReq := &manager.RecordWorkflowExecutionStartedRequest{676 VisibilityRequestBase: &manager.VisibilityRequestBase{677 NamespaceID: namespaceID,678 Execution: workflowExecution,679 WorkflowTypeName: workflowType,680 StartTime: startTime,681 TaskQueue: taskQueue,682 },683 }684 err := s.VisibilityMgr.RecordWorkflowExecutionStarted(s.ctx, startReq)685 s.Nil(err)686 return startReq687}688func (s *VisibilityPersistenceSuite) assertClosedExecutionEquals(689 req *manager.RecordWorkflowExecutionClosedRequest, resp *workflowpb.WorkflowExecutionInfo) {690 s.Equal(req.Execution.RunId, resp.Execution.RunId)691 s.Equal(req.Execution.WorkflowId, resp.Execution.WorkflowId)692 s.Equal(req.WorkflowTypeName, resp.GetType().GetName())693 s.Equal(persistence.UnixMilliseconds(req.StartTime), persistence.UnixMilliseconds(timestamp.TimeValue(resp.GetStartTime())))694 s.Equal(persistence.UnixMilliseconds(req.CloseTime), persistence.UnixMilliseconds(timestamp.TimeValue(resp.GetCloseTime())))695 s.Equal(req.Status, resp.GetStatus())696 s.Equal(req.HistoryLength, resp.HistoryLength)697}698func (s *VisibilityPersistenceSuite) assertOpenExecutionEquals(699 req *manager.RecordWorkflowExecutionStartedRequest, resp *workflowpb.WorkflowExecutionInfo) {700 s.Equal(req.Execution.GetRunId(), resp.Execution.GetRunId())...
cron_test.go
Source:cron_test.go
...73 s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))74 respondFailed := false75 seeRetry := false76 wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,77 previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {78 if !respondFailed {79 respondFailed = true80 return []*commandpb.Command{81 {82 CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION,83 Attributes: &commandpb.Command_FailWorkflowExecutionCommandAttributes{84 FailWorkflowExecutionCommandAttributes: &commandpb.FailWorkflowExecutionCommandAttributes{85 Failure: failure.NewServerFailure("cron error for retry", false),86 }},87 }}, nil88 }89 startEvent := history.Events[0]90 seeRetry = startEvent.GetWorkflowExecutionStartedEventAttributes().Initiator == enumspb.CONTINUE_AS_NEW_INITIATOR_RETRY91 return []*commandpb.Command{92 {93 CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,94 Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{95 CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{96 Result: nil,97 }},98 }}, nil99 }100 poller := &TaskPoller{101 Engine: s.engine,102 Namespace: s.namespace,103 TaskQueue: &taskqueuepb.TaskQueue{Name: tl},104 Identity: identity,105 WorkflowTaskHandler: wtHandler,106 Logger: s.Logger,107 T: s.T(),108 }109 s.Logger.Info("Process first cron run which fails")110 _, err := poller.PollAndProcessWorkflowTask(true, false)111 s.NoError(err)112 s.Logger.Info("Process first cron run which completes")113 _, err = poller.PollAndProcessWorkflowTask(true, false)114 s.NoError(err)115 s.True(seeRetry)116}117func (s *integrationSuite) TestCronWorkflow() {118 id := "integration-wf-cron-test"119 wt := "integration-wf-cron-type"120 tl := "integration-wf-cron-taskqueue"121 identity := "worker1"122 cronSchedule := "@every 3s"123 targetBackoffDuration := time.Second * 3124 backoffDurationTolerance := time.Millisecond * 500125 memo := &commonpb.Memo{126 Fields: map[string]*commonpb.Payload{"memoKey": payload.EncodeString("memoVal")},127 }128 searchAttr := &commonpb.SearchAttributes{129 IndexedFields: map[string]*commonpb.Payload{130 "CustomKeywordField": payload.EncodeString(`"1"`),131 },132 }133 // can't do simply s.Equal because "type" is added134 checkSearchAttrs := func(sa *commonpb.SearchAttributes) {135 field := sa.IndexedFields["CustomKeywordField"]136 s.Equal(searchAttr.IndexedFields["CustomKeywordField"].Data, field.Data)137 s.Equal([]byte("Keyword"), field.Metadata["type"])138 }139 request := &workflowservice.StartWorkflowExecutionRequest{140 RequestId: uuid.New(),141 Namespace: s.namespace,142 WorkflowId: id,143 WorkflowType: &commonpb.WorkflowType{Name: wt},144 TaskQueue: &taskqueuepb.TaskQueue{Name: tl},145 Input: nil,146 WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),147 WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),148 Identity: identity,149 CronSchedule: cronSchedule, // minimum interval by standard spec is 1m (* * * * *, use non-standard descriptor for short interval for test150 Memo: memo,151 SearchAttributes: searchAttr,152 }153 // Because of rounding in GetBackoffForNextSchedule, we'll tend to stay aligned to whatever154 // phase we start in relative to second boundaries, but drift slightly later within the second155 // over time. If we cross a second boundary, one of our intervals will end up being 2s instead156 // of 3s. To avoid this, wait until we can start early in the second.157 for time.Now().Nanosecond()/int(time.Millisecond) > 150 {158 time.Sleep(50 * time.Millisecond)159 }160 startWorkflowTS := time.Now().UTC()161 we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)162 s.NoError(err0)163 s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))164 var executions []*commonpb.WorkflowExecution165 wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,166 previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {167 if previousStartedEventID == common.EmptyEventID {168 startedEvent := history.Events[0]169 if startedEvent.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {170 return []*commandpb.Command{171 {172 CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION,173 Attributes: &commandpb.Command_FailWorkflowExecutionCommandAttributes{FailWorkflowExecutionCommandAttributes: &commandpb.FailWorkflowExecutionCommandAttributes{174 Failure: failure.NewServerFailure("incorrect first event", true),175 }},176 }}, nil177 }178 // Just check that it can be decoded179 s.decodePayloadsInt(startedEvent.GetWorkflowExecutionStartedEventAttributes().GetLastCompletionResult())180 }181 executions = append(executions, execution)182 if len(executions) >= 3 {183 return []*commandpb.Command{184 {185 CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,186 Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{187 Result: payloads.EncodeString("cron-test-result"),188 }},189 }}, nil190 }191 return []*commandpb.Command{192 {193 CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION,194 Attributes: &commandpb.Command_FailWorkflowExecutionCommandAttributes{FailWorkflowExecutionCommandAttributes: &commandpb.FailWorkflowExecutionCommandAttributes{195 Failure: failure.NewServerFailure("cron-test-error", false),196 }},197 }}, nil198 }199 poller := &TaskPoller{200 Engine: s.engine,201 Namespace: s.namespace,202 TaskQueue: &taskqueuepb.TaskQueue{Name: tl},203 Identity: identity,204 WorkflowTaskHandler: wtHandler,205 Logger: s.Logger,206 T: s.T(),207 }208 startFilter := &filterpb.StartTimeFilter{}209 startFilter.EarliestTime = &startWorkflowTS210 startFilter.LatestTime = timestamp.TimePtr(time.Now().UTC())211 // Sleep some time before checking the open executions.212 // This will not cost extra time as the polling for first workflow task will be blocked for 3 seconds.213 time.Sleep(2 * time.Second)214 resp, err := s.engine.ListOpenWorkflowExecutions(NewContext(), &workflowservice.ListOpenWorkflowExecutionsRequest{215 Namespace: s.namespace,216 MaximumPageSize: 100,217 StartTimeFilter: startFilter,218 Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_ExecutionFilter{ExecutionFilter: &filterpb.WorkflowExecutionFilter{219 WorkflowId: id,220 }},221 })222 s.NoError(err)223 s.Equal(1, len(resp.GetExecutions()))224 executionInfo := resp.GetExecutions()[0]225 s.Equal(targetBackoffDuration, executionInfo.GetExecutionTime().Sub(timestamp.TimeValue(executionInfo.GetStartTime())))226 _, err = poller.PollAndProcessWorkflowTask(false, false)227 s.NoError(err)228 // Make sure the cron workflow start running at a proper time, in this case 3 seconds after the229 // startWorkflowExecution request230 backoffDuration := time.Now().UTC().Sub(startWorkflowTS)231 s.True(backoffDuration > targetBackoffDuration)232 s.True(backoffDuration < targetBackoffDuration+backoffDurationTolerance)233 _, err = poller.PollAndProcessWorkflowTask(false, false)234 s.NoError(err)235 _, err = poller.PollAndProcessWorkflowTask(false, false)236 s.NoError(err)237 s.Equal(3, len(executions))238 _, terminateErr := s.engine.TerminateWorkflowExecution(NewContext(), &workflowservice.TerminateWorkflowExecutionRequest{239 Namespace: s.namespace,240 WorkflowExecution: &commonpb.WorkflowExecution{241 WorkflowId: id,242 },243 })244 s.NoError(terminateErr)245 // first two should be failures246 for i := 0; i < 2; i++ {247 events := s.getHistory(s.namespace, executions[i])248 startAttrs := events[0].GetWorkflowExecutionStartedEventAttributes()249 s.Equal(memo, startAttrs.Memo)250 checkSearchAttrs(startAttrs.SearchAttributes)251 lastEvent := events[len(events)-1]252 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, lastEvent.GetEventType())253 failAttrs := lastEvent.GetWorkflowExecutionFailedEventAttributes()254 s.Equal("cron-test-error", failAttrs.GetFailure().GetMessage())255 s.Equal(executions[i+1].RunId, failAttrs.GetNewExecutionRunId())256 }257 // third should be completed258 events := s.getHistory(s.namespace, executions[2])259 startAttrs := events[0].GetWorkflowExecutionStartedEventAttributes()260 s.Equal(memo, startAttrs.Memo)261 checkSearchAttrs(startAttrs.SearchAttributes)262 lastEvent := events[len(events)-1]263 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, lastEvent.GetEventType())264 completedAttrs := lastEvent.GetWorkflowExecutionCompletedEventAttributes()265 s.Equal("cron-test-result", s.decodePayloadsString(completedAttrs.Result))266 startFilter.LatestTime = timestamp.TimePtr(time.Now().UTC())267 var closedExecutions []*workflowpb.WorkflowExecutionInfo268 for i := 0; i < 10; i++ {269 resp, err := s.engine.ListClosedWorkflowExecutions(NewContext(), &workflowservice.ListClosedWorkflowExecutionsRequest{270 Namespace: s.namespace,271 MaximumPageSize: 100,272 StartTimeFilter: startFilter,273 Filters: &workflowservice.ListClosedWorkflowExecutionsRequest_ExecutionFilter{ExecutionFilter: &filterpb.WorkflowExecutionFilter{274 WorkflowId: id,275 }},276 })277 s.NoError(err)278 if len(resp.GetExecutions()) == 4 {279 closedExecutions = resp.GetExecutions()280 break281 }282 time.Sleep(200 * time.Millisecond)283 }284 s.NotNil(closedExecutions)285 dweResponse, err := s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{286 Namespace: s.namespace,287 Execution: &commonpb.WorkflowExecution{288 WorkflowId: id,289 RunId: we.RunId,290 },291 })292 s.NoError(err)293 expectedExecutionTime := dweResponse.WorkflowExecutionInfo.GetStartTime().Add(3 * time.Second)294 s.Equal(expectedExecutionTime, timestamp.TimeValue(dweResponse.WorkflowExecutionInfo.GetExecutionTime()))295 sort.Slice(closedExecutions, func(i, j int) bool {296 return closedExecutions[i].GetStartTime().Before(timestamp.TimeValue(closedExecutions[j].GetStartTime()))297 })298 lastExecution := closedExecutions[0]299 for i := 1; i < 4; i++ {300 executionInfo := closedExecutions[i]301 expectedBackoff := executionInfo.GetExecutionTime().Sub(timestamp.TimeValue(lastExecution.GetExecutionTime()))302 // The execution time calculated based on last execution close time.303 // However, the current execution time is based on the current start time.304 // This code is to remove the diff between current start time and last execution close time.305 // TODO: Remove this line once we unify the time source306 executionTimeDiff := executionInfo.GetStartTime().Sub(timestamp.TimeValue(lastExecution.GetCloseTime()))307 // The backoff between any two executions should be a multiplier of the target backoff duration which is 3 in this test308 s.Equal(309 0,310 int((expectedBackoff-executionTimeDiff).Round(time.Second).Seconds())%int(targetBackoffDuration.Seconds()),311 "expected backoff %v-%v=%v should be multiplier of target backoff %v",312 expectedBackoff.Seconds(),313 executionTimeDiff.Seconds(),314 (expectedBackoff - executionTimeDiff).Round(time.Second).Seconds(),315 targetBackoffDuration.Seconds())316 lastExecution = executionInfo317 }318}319func (s *clientIntegrationSuite) TestCronWorkflowCompletionStates() {320 // Run a cron workflow that completes in (almost) all the possible ways:321 // Run 1: succeeds322 // Run 2: fails323 // Run 3: times out324 // Run 4: succeeds325 // Run 5: succeeds326 // Run 6: terminated before it runs327 // Continue-as-new is not tested (behavior is currently not correct)328 id := "integration-wf-cron-failed-test"329 cronSchedule := "@every 3s"330 targetBackoffDuration := 3 * time.Second331 workflowRunTimeout := 5 * time.Second332 tolerance := 500 * time.Millisecond333 runIDs := make(map[string]bool)334 wfCh := make(chan int)335 workflowFn := func(ctx workflow.Context) (string, error) {336 runIDs[workflow.GetInfo(ctx).WorkflowExecution.RunID] = true337 iteration := len(runIDs)338 wfCh <- iteration339 var lcr string340 switch iteration {341 case 1:342 s.False(workflow.HasLastCompletionResult(ctx))343 s.Nil(workflow.GetLastError(ctx))344 return "pass", nil345 case 2:346 s.True(workflow.HasLastCompletionResult(ctx))347 s.NoError(workflow.GetLastCompletionResult(ctx, &lcr))348 s.Equal(lcr, "pass")349 s.Nil(workflow.GetLastError(ctx))350 return "", errors.New("second error")351 case 3:352 s.True(workflow.HasLastCompletionResult(ctx))353 s.NoError(workflow.GetLastCompletionResult(ctx, &lcr))354 s.Equal(lcr, "pass")355 s.NotNil(workflow.GetLastError(ctx))356 s.Equal(workflow.GetLastError(ctx).Error(), "second error")357 workflow.Sleep(ctx, 10*time.Second) // cause wft timeout358 panic("should have been timed out on server already")359 case 4:360 s.True(workflow.HasLastCompletionResult(ctx))361 s.NoError(workflow.GetLastCompletionResult(ctx, &lcr))362 s.Equal(lcr, "pass")363 s.NotNil(workflow.GetLastError(ctx))364 s.Equal(workflow.GetLastError(ctx).Error(), "workflow timeout (type: StartToClose)")365 return "pass again", nil366 case 5:367 s.True(workflow.HasLastCompletionResult(ctx))368 s.NoError(workflow.GetLastCompletionResult(ctx, &lcr))369 s.Equal(lcr, "pass again")370 s.Nil(workflow.GetLastError(ctx))371 return "final pass", nil372 }373 panic("shouldn't get here")374 }375 s.worker.RegisterWorkflow(workflowFn)376 // Because of rounding in GetBackoffForNextSchedule, we'll tend to stay aligned to whatever377 // phase we start in relative to second boundaries, but drift slightly later within the second378 // over time. If we cross a second boundary, one of our intervals will end up being 2s instead379 // of 3s. To avoid this, wait until we can start early in the second.380 for time.Now().Nanosecond()/int(time.Millisecond) > 150 {381 time.Sleep(50 * time.Millisecond)382 }383 workflowOptions := sdkclient.StartWorkflowOptions{384 ID: id,385 TaskQueue: s.taskQueue,386 WorkflowRunTimeout: workflowRunTimeout,387 CronSchedule: cronSchedule,388 }389 ts := time.Now()390 startTs := ts391 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)392 defer cancel()393 _, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)394 s.NoError(err)395 // check execution and history of first run396 exec := s.listOpenWorkflowExecutions(startTs, time.Now(), id, 1)[0]397 firstRunID := exec.GetExecution().RunId398 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, exec.GetStatus())399 lastEvent := s.getLastEvent(s.namespace, exec.GetExecution())400 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, lastEvent.GetEventType())401 attrs0 := lastEvent.GetWorkflowExecutionStartedEventAttributes()402 s.Equal(cronSchedule, attrs0.CronSchedule)403 s.DurationNear(timestamp.DurationValue(attrs0.FirstWorkflowTaskBackoff), targetBackoffDuration, tolerance)404 s.Equal(firstRunID, attrs0.FirstExecutionRunId)405 s.Equal(enumspb.CONTINUE_AS_NEW_INITIATOR_CRON_SCHEDULE, attrs0.Initiator)406 s.Equal("", attrs0.ContinuedExecutionRunId)407 // wait for first run408 s.Equal(<-wfCh, 1)409 s.DurationNear(time.Since(ts), targetBackoffDuration, tolerance)410 ts = time.Now()411 // let first run finish, then check execution and history of second run412 time.Sleep(500 * time.Millisecond)413 exec = s.listOpenWorkflowExecutions(startTs, time.Now(), id, 1)[0]414 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, exec.GetStatus())415 lastEvent = s.getLastEvent(s.namespace, exec.GetExecution())416 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, lastEvent.GetEventType())417 attrs0 = lastEvent.GetWorkflowExecutionStartedEventAttributes()418 s.Equal(cronSchedule, attrs0.CronSchedule)419 s.DurationNear(timestamp.DurationValue(attrs0.FirstWorkflowTaskBackoff), targetBackoffDuration, tolerance)420 s.Equal(firstRunID, attrs0.FirstExecutionRunId)421 s.Equal(enumspb.CONTINUE_AS_NEW_INITIATOR_CRON_SCHEDULE, attrs0.Initiator)422 s.Equal(firstRunID, attrs0.ContinuedExecutionRunId)423 // wait for second run424 s.Equal(<-wfCh, 2)425 s.DurationNear(time.Since(ts), targetBackoffDuration, tolerance)426 ts = time.Now()427 // don't bother checking started events for subsequent runs, we covered the important parts already428 // wait for third run429 s.Equal(<-wfCh, 3)430 s.DurationNear(time.Since(ts), targetBackoffDuration, tolerance)431 ts = time.Now()432 // wait for fourth run (third one waits for timeout after 5s, so will run after 6s)433 s.Equal(<-wfCh, 4)434 s.DurationNear(time.Since(ts), 2*targetBackoffDuration, tolerance)435 ts = time.Now()436 // wait for fifth run437 s.Equal(<-wfCh, 5)438 s.DurationNear(time.Since(ts), targetBackoffDuration, tolerance)439 ts = time.Now()440 // let fifth run finish and sixth get scheduled441 time.Sleep(500 * time.Millisecond)442 // then terminate443 s.NoError(s.sdkClient.TerminateWorkflow(ctx, id, "", "test is over"))444 closedExecutions := s.listClosedWorkflowExecutions(startTs, time.Now(), id, 6)445 exec = closedExecutions[5] // first: success446 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, exec.GetStatus())447 lastEvent = s.getLastEvent(s.namespace, exec.GetExecution())448 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, lastEvent.GetEventType())449 attrs1 := lastEvent.GetWorkflowExecutionCompletedEventAttributes()450 s.Equal("pass", s.decodePayloadsString(attrs1.GetResult()))451 exec = closedExecutions[4] // second: fail452 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, exec.GetStatus())453 lastEvent = s.getLastEvent(s.namespace, exec.GetExecution())454 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, lastEvent.GetEventType())455 attrs2 := lastEvent.GetWorkflowExecutionFailedEventAttributes()456 s.Equal("second error", attrs2.GetFailure().GetMessage())457 exec = closedExecutions[3] // third: timed out458 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, exec.GetStatus())459 lastEvent = s.getLastEvent(s.namespace, exec.GetExecution())460 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT, lastEvent.GetEventType())461 attrs3 := lastEvent.GetWorkflowExecutionTimedOutEventAttributes()462 s.Equal(attrs3.GetRetryState(), enumspb.RETRY_STATE_RETRY_POLICY_NOT_SET)463 exec = closedExecutions[2] // fourth: success464 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, exec.GetStatus())465 lastEvent = s.getLastEvent(s.namespace, exec.GetExecution())466 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, lastEvent.GetEventType())467 attrs1 = lastEvent.GetWorkflowExecutionCompletedEventAttributes()468 s.Equal("pass again", s.decodePayloadsString(attrs1.GetResult()))469 exec = closedExecutions[1] // fifth: success470 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, exec.GetStatus())471 lastEvent = s.getLastEvent(s.namespace, exec.GetExecution())472 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, lastEvent.GetEventType())473 attrs1 = lastEvent.GetWorkflowExecutionCompletedEventAttributes()474 s.Equal("final pass", s.decodePayloadsString(attrs1.GetResult()))475 exec = closedExecutions[0] // sixth: terminated476 s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED, exec.GetStatus())477 events := s.getHistory(s.namespace, exec.GetExecution())478 s.Equal(2, len(events)) // only started and terminated479 lastEvent = events[len(events)-1]480 attrs4 := lastEvent.GetWorkflowExecutionTerminatedEventAttributes()481 s.Equal("test is over", attrs4.GetReason())482}483func (s *clientIntegrationSuite) listOpenWorkflowExecutions(start, end time.Time, id string, expectedNumber int) []*workflowpb.WorkflowExecutionInfo {484 s.T().Helper()485 for i := 0; i < 20; i++ {486 resp, err := s.sdkClient.ListOpenWorkflow(NewContext(), &workflowservice.ListOpenWorkflowExecutionsRequest{487 Namespace: s.namespace,488 MaximumPageSize: int32(2 * expectedNumber),489 StartTimeFilter: &filterpb.StartTimeFilter{EarliestTime: &start, LatestTime: &end},490 Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_ExecutionFilter{ExecutionFilter: &filterpb.WorkflowExecutionFilter{491 WorkflowId: id,492 }},493 })494 s.NoError(err)495 if len(resp.GetExecutions()) == expectedNumber {496 return resp.GetExecutions()497 }498 time.Sleep(250 * time.Millisecond)499 }500 s.FailNow("didn't get expected number")501 panic("unreached")502}503func (s *clientIntegrationSuite) listClosedWorkflowExecutions(start, end time.Time, id string, expectedNumber int) []*workflowpb.WorkflowExecutionInfo {504 s.T().Helper()505 for i := 0; i < 20; i++ {506 resp, err := s.sdkClient.ListClosedWorkflow(NewContext(), &workflowservice.ListClosedWorkflowExecutionsRequest{507 Namespace: s.namespace,508 MaximumPageSize: int32(2 * expectedNumber),509 StartTimeFilter: &filterpb.StartTimeFilter{EarliestTime: &start, LatestTime: &end},510 Filters: &workflowservice.ListClosedWorkflowExecutionsRequest_ExecutionFilter{ExecutionFilter: &filterpb.WorkflowExecutionFilter{511 WorkflowId: id,512 }},513 })514 s.NoError(err)515 if len(resp.GetExecutions()) == expectedNumber {516 return resp.GetExecutions()517 }518 time.Sleep(250 * time.Millisecond)519 }520 s.FailNow("didn't get expected number")521 panic("unreached")522}...
start
Using AI Code Generation
1import "fmt"2func main() {3 fmt.Println("Hello, World!")4}5import "fmt"6func init() {7 fmt.Println("Hello, World!")8}9import "fmt"10func main() {11 fmt.Println("Hello, World!")12}13import "fmt"14func init() {15 fmt.Println("Hello, World!")16}17import "fmt"18func main() {19 fmt.Println("Hello, World!")20}21import "fmt"22func init() {23 fmt.Println("Hello, World!")24}25import "fmt"26func main() {27 fmt.Println("Hello, World!")28}29import "fmt"30func init() {31 fmt.Println("Hello, World!")32}33import "fmt"34func main() {35 fmt.Println("Hello, World!")36}37import "fmt"38func init() {39 fmt.Println("Hello, World!")40}41import "fmt"42func main() {43 fmt.Println("Hello, World!")44}45import "fmt"46func init() {
start
Using AI Code Generation
1import (2func main() {3 cmd := exec.Command("ls", "-l")4 stdout, err := cmd.StdoutPipe()5 if err != nil {6 fmt.Println(err)7 os.Exit(1)8 }9 if err := cmd.Start(); err != nil {10 fmt.Println(err)11 os.Exit(1)12 }13 in := bufio.NewScanner(stdout)14 for in.Scan() {15 }16 if err := in.Err(); err != nil {17 fmt.Println("error:", err)18 }19 cmd := exec.Command("ls", "-l")20 stdout, err := cmd.StdoutPipe()21 if err != nil {22 fmt.Println(err)23 os.Exit(1)24 }25 if err := cmd.Start(); err != nil {26 fmt.Println(err)27 os.Exit(1)28 }29 in := bufio.NewScanner(stdout)30 for in.Scan() {31 }32 if err := in.Err(); err != nil {33 fmt.Println("error:", err)34 }35 cmd := exec.Command("ls", "-l")36 stdout, err := cmd.StdoutPipe()37 if err != nil {38 fmt.Println(err)39 os.Exit(1)40 }41 if err := cmd.Start(); err != nil {42 fmt.Println(err)43 os.Exit(1)44 }45 in := bufio.NewScanner(stdout)46 for in.Scan() {47 }
start
Using AI Code Generation
1import (2func main() {3 fmt.Println("a = ", a, "b = ", b)4 fmt.Println("Before swap, memory address of a is", &a, "and of b is", &b)5 fmt.Println("After swap, memory address of a is", &a, "and of b is", &b)6 fmt.Println("a = ", a, "b = ", b)7}8func swap(x *int, y *int) {9 fmt.Println("Inside swap function, memory address of x is", x, "and of y is", y)10 fmt.Println("After swapping, *x = ", *x, "and *y = ", *y)11}12import (13func main() {14 fmt.Println("a = ", a, "b = ", b)15 fmt.Println("Before swap, memory address of a is", &a, "and of b is", &b)16 fmt.Println("After swap, memory address of a is", &a, "and of b is", &b)17 fmt.Println("a = ", a, "b = ", b)18}19func swap(x *int, y *int) {20 fmt.Println("Inside swap function, memory address of x is", x, "and of y is", y)21 fmt.Println("After swapping, *x = ", *x, "and *y = ", *y)22}
start
Using AI Code Generation
1import "fmt"2func main() {3 fmt.Println("Enter a number:")4 fmt.Scanln(&a)5 fmt.Println("You entered:", a)6}7import "fmt"8func main() {9 fmt.Println("Enter a number:")10 fmt.Scanf("%d", &a)11 fmt.Println("You entered:", a)12}13import (14func main() {15 reader := bufio.NewReader(os.Stdin)16 fmt.Println("Enter text:")17 text, _ := reader.ReadString('18 fmt.Println("You entered:", text)19}20import (21func main() {22 reader := bufio.NewReader(os.Stdin)23 fmt.Println("Enter text:")24 text, _ := reader.ReadString('25 fmt.Println("You entered:", text)26}27import (28func main() {29 reader := bufio.NewReader(os.Stdin)30 fmt.Println("Enter text:")31 text, _ := reader.ReadString('32 fmt.Println("You entered:", text[:len(text)-1])33}34import (35func main() {36 reader := bufio.NewReader(os
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!