How to use getExecutions method of executions Package

Best Testkube code snippet using executions.getExecutions

elasticsearch_test.go

Source:elasticsearch_test.go Github

copy

Full Screen

1// The MIT License2//3// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.4//5// Copyright (c) 2020 Uber Technologies, Inc.6//7// Permission is hereby granted, free of charge, to any person obtaining a copy8// of this software and associated documentation files (the "Software"), to deal9// in the Software without restriction, including without limitation the rights10// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell11// copies of the Software, and to permit persons to whom the Software is12// furnished to do so, subject to the following conditions:13//14// The above copyright notice and this permission notice shall be included in15// all copies or substantial portions of the Software.16//17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR18// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,19// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE20// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER21// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,22// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN23// THE SOFTWARE.24//go:build esintegration25// +build esintegration26// to run locally, make sure Elasticsearch is running,27// then run cmd `go test -v ./host -run TestElasticsearchIntegrationSuite -tags esintegration`28package host29import (30 "bytes"31 "context"32 "encoding/json"33 "flag"34 "fmt"35 "strconv"36 "strings"37 "testing"38 "time"39 "github.com/pborman/uuid"40 "github.com/stretchr/testify/require"41 "github.com/stretchr/testify/suite"42 commandpb "go.temporal.io/api/command/v1"43 commonpb "go.temporal.io/api/common/v1"44 enumspb "go.temporal.io/api/enums/v1"45 filterpb "go.temporal.io/api/filter/v1"46 historypb "go.temporal.io/api/history/v1"47 "go.temporal.io/api/serviceerror"48 taskqueuepb "go.temporal.io/api/taskqueue/v1"49 workflowpb "go.temporal.io/api/workflow/v1"50 "go.temporal.io/api/workflowservice/v1"51 "go.temporal.io/server/common/config"52 "go.temporal.io/server/common/log/tag"53 "go.temporal.io/server/common/payload"54 "go.temporal.io/server/common/payloads"55 esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"56 "go.temporal.io/server/common/primitives/timestamp"57 "go.temporal.io/server/common/searchattribute"58)59const (60 numOfRetry = 5061 waitTimeInMs = 40062 waitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent63)64type elasticsearchIntegrationSuite struct {65 // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,66 // not merely log an error67 *require.Assertions68 IntegrationBase69 esClient esclient.IntegrationTestsClient70 testSearchAttributeKey string71 testSearchAttributeVal string72}73// This cluster use customized threshold for history config74func (s *elasticsearchIntegrationSuite) SetupSuite() {75 s.setupSuite("testdata/integration_elasticsearch_cluster.yaml")76 s.esClient = CreateESClient(s.Suite, s.testClusterConfig.ESConfig, s.Logger)77 PutIndexTemplate(s.Suite, s.esClient, fmt.Sprintf("testdata/es_%s_index_template.json", s.testClusterConfig.ESConfig.Version), "test-visibility-template")78 indexName := s.testClusterConfig.ESConfig.GetVisibilityIndex()79 CreateIndex(s.Suite, s.esClient, indexName)80 s.putIndexSettings(indexName, defaultTestValueOfESIndexMaxResultWindow)81}82func (s *elasticsearchIntegrationSuite) TearDownSuite() {83 s.tearDownSuite()84 DeleteIndex(s.Suite, s.esClient, s.testClusterConfig.ESConfig.GetVisibilityIndex())85}86func (s *elasticsearchIntegrationSuite) SetupTest() {87 // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil88 s.Assertions = require.New(s.T())89 s.testSearchAttributeKey = "CustomTextField"90 s.testSearchAttributeVal = "test value"91}92func TestElasticsearchIntegrationSuite(t *testing.T) {93 flag.Parse()94 suite.Run(t, new(elasticsearchIntegrationSuite))95}96func (s *elasticsearchIntegrationSuite) TestListOpenWorkflow() {97 id := "es-integration-start-workflow-test"98 wt := "es-integration-start-workflow-test-type"99 tl := "es-integration-start-workflow-test-taskqueue"100 request := s.createStartWorkflowExecutionRequest(id, wt, tl)101 attrPayload, _ := payload.Encode(s.testSearchAttributeVal)102 searchAttr := &commonpb.SearchAttributes{103 IndexedFields: map[string]*commonpb.Payload{104 s.testSearchAttributeKey: attrPayload,105 },106 }107 request.SearchAttributes = searchAttr108 startTime := time.Now().UTC()109 we, err := s.engine.StartWorkflowExecution(NewContext(), request)110 s.NoError(err)111 startFilter := &filterpb.StartTimeFilter{}112 startFilter.EarliestTime = &startTime113 var openExecution *workflowpb.WorkflowExecutionInfo114 for i := 0; i < numOfRetry; i++ {115 startFilter.LatestTime = timestamp.TimePtr(time.Now().UTC())116 resp, err := s.engine.ListOpenWorkflowExecutions(NewContext(), &workflowservice.ListOpenWorkflowExecutionsRequest{117 Namespace: s.namespace,118 MaximumPageSize: defaultTestValueOfESIndexMaxResultWindow,119 StartTimeFilter: startFilter,120 Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_ExecutionFilter{ExecutionFilter: &filterpb.WorkflowExecutionFilter{121 WorkflowId: id,122 }},123 })124 s.NoError(err)125 if len(resp.GetExecutions()) == 1 {126 openExecution = resp.GetExecutions()[0]127 s.Nil(resp.NextPageToken)128 break129 }130 time.Sleep(waitTimeInMs * time.Millisecond)131 }132 s.NotNil(openExecution)133 s.Equal(we.GetRunId(), openExecution.GetExecution().GetRunId())134 s.Equal(1, len(openExecution.GetSearchAttributes().GetIndexedFields()))135 attrPayloadFromResponse, attrExist := openExecution.GetSearchAttributes().GetIndexedFields()[s.testSearchAttributeKey]136 s.True(attrExist)137 s.Equal(attrPayload.GetData(), attrPayloadFromResponse.GetData())138 attrType, typeSet := attrPayloadFromResponse.GetMetadata()[searchattribute.MetadataType]139 s.True(typeSet)140 s.True(len(attrType) > 0)141}142func (s *elasticsearchIntegrationSuite) TestListWorkflow() {143 id := "es-integration-list-workflow-test"144 wt := "es-integration-list-workflow-test-type"145 tl := "es-integration-list-workflow-test-taskqueue"146 request := s.createStartWorkflowExecutionRequest(id, wt, tl)147 we, err := s.engine.StartWorkflowExecution(NewContext(), request)148 s.NoError(err)149 query := fmt.Sprintf(`WorkflowId = "%s"`, id)150 s.testHelperForReadOnce(we.GetRunId(), query, false)151}152func (s *elasticsearchIntegrationSuite) TestListWorkflow_ExecutionTime() {153 id := "es-integration-list-workflow-execution-time-test"154 wt := "es-integration-list-workflow-execution-time-test-type"155 tl := "es-integration-list-workflow-execution-time-test-taskqueue"156 now := time.Now().UTC()157 request := s.createStartWorkflowExecutionRequest(id, wt, tl)158 // Start workflow with ExecutionTime equal to StartTime159 weNonCron, err := s.engine.StartWorkflowExecution(NewContext(), request)160 s.NoError(err)161 cronID := id + "-cron"162 request.CronSchedule = "@every 1m"163 request.WorkflowId = cronID164 // Start workflow with ExecutionTime equal to StartTime + 1 minute (cron delay)165 weCron, err := s.engine.StartWorkflowExecution(NewContext(), request)166 s.NoError(err)167 // <<1s <<1s 1m168 // ----+-----+------------+----------------------------+--------------169 // now nonCronStart cronStart cronExecutionTime170 // =nonCronExecutionTime171 expectedNonCronMaxExecutionTime := now.Add(1 * time.Second) // 1 second for time skew172 expectedCronMaxExecutionTime := now.Add(1 * time.Minute).Add(1 * time.Second) // 1 second for time skew173 // WorkflowId filter is to filter workflows from other tests.174 nonCronQueryNanos := fmt.Sprintf(`(WorkflowId = "%s" or WorkflowId = "%s") AND ExecutionTime < %d`, id, cronID, expectedNonCronMaxExecutionTime.UnixNano())175 s.testHelperForReadOnce(weNonCron.GetRunId(), nonCronQueryNanos, false)176 cronQueryNanos := fmt.Sprintf(`(WorkflowId = "%s" or WorkflowId = "%s") AND ExecutionTime < %d AND ExecutionTime > %d`, id, cronID, expectedCronMaxExecutionTime.UnixNano(), expectedNonCronMaxExecutionTime.UnixNano())177 s.testHelperForReadOnce(weCron.GetRunId(), cronQueryNanos, false)178 nonCronQuery := fmt.Sprintf(`(WorkflowId = "%s" or WorkflowId = "%s") AND ExecutionTime < "%s"`, id, cronID, expectedNonCronMaxExecutionTime.Format(time.RFC3339Nano))179 s.testHelperForReadOnce(weNonCron.GetRunId(), nonCronQuery, false)180 cronQuery := fmt.Sprintf(`(WorkflowId = "%s" or WorkflowId = "%s") AND ExecutionTime < "%s" AND ExecutionTime > "%s"`, id, cronID, expectedCronMaxExecutionTime.Format(time.RFC3339Nano), expectedNonCronMaxExecutionTime.Format(time.RFC3339Nano))181 s.testHelperForReadOnce(weCron.GetRunId(), cronQuery, false)182}183func (s *elasticsearchIntegrationSuite) TestListWorkflow_SearchAttribute() {184 id := "es-integration-list-workflow-by-search-attr-test"185 wt := "es-integration-list-workflow-by-search-attr-test-type"186 tl := "es-integration-list-workflow-by-search-attr-test-taskqueue"187 request := s.createStartWorkflowExecutionRequest(id, wt, tl)188 attrValBytes, _ := payload.Encode(s.testSearchAttributeVal)189 searchAttr := &commonpb.SearchAttributes{190 IndexedFields: map[string]*commonpb.Payload{191 s.testSearchAttributeKey: attrValBytes,192 },193 }194 request.SearchAttributes = searchAttr195 we, err := s.engine.StartWorkflowExecution(NewContext(), request)196 s.NoError(err)197 query := fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)198 s.testHelperForReadOnce(we.GetRunId(), query, false)199 searchAttributes := s.createSearchAttributes()200 // test upsert201 wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,202 previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {203 upsertCommand := &commandpb.Command{204 CommandType: enumspb.COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES,205 Attributes: &commandpb.Command_UpsertWorkflowSearchAttributesCommandAttributes{UpsertWorkflowSearchAttributesCommandAttributes: &commandpb.UpsertWorkflowSearchAttributesCommandAttributes{206 SearchAttributes: searchAttributes,207 }}}208 return []*commandpb.Command{upsertCommand}, nil209 }210 taskQueue := &taskqueuepb.TaskQueue{Name: tl}211 poller := &TaskPoller{212 Engine: s.engine,213 Namespace: s.namespace,214 TaskQueue: taskQueue,215 StickyTaskQueue: taskQueue,216 Identity: "worker1",217 WorkflowTaskHandler: wtHandler,218 Logger: s.Logger,219 T: s.T(),220 }221 _, newTask, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(222 false,223 false,224 true,225 true,226 0,227 1,228 true,229 nil)230 s.NoError(err)231 s.NotNil(newTask)232 s.NotNil(newTask.WorkflowTask)233 time.Sleep(waitForESToSettle)234 listRequest := &workflowservice.ListWorkflowExecutionsRequest{235 Namespace: s.namespace,236 PageSize: int32(2),237 Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Running' and BinaryChecksums = 'binary-v1'`, wt),238 }239 // verify upsert data is on ES240 s.testListResultForUpsertSearchAttributes(listRequest)241 // verify DescribeWorkflowExecution242 descRequest := &workflowservice.DescribeWorkflowExecutionRequest{243 Namespace: s.namespace,244 Execution: &commonpb.WorkflowExecution{245 WorkflowId: id,246 },247 }248 descResp, err := s.engine.DescribeWorkflowExecution(NewContext(), descRequest)249 s.NoError(err)250 s.Equal(len(searchAttributes.GetIndexedFields()), len(descResp.WorkflowExecutionInfo.GetSearchAttributes().GetIndexedFields()))251 for attrName, expectedPayload := range searchAttributes.GetIndexedFields() {252 respAttr, ok := descResp.WorkflowExecutionInfo.GetSearchAttributes().GetIndexedFields()[attrName]253 s.True(ok)254 s.Equal(expectedPayload.GetData(), respAttr.GetData())255 attrType, typeSet := respAttr.GetMetadata()[searchattribute.MetadataType]256 s.True(typeSet)257 s.True(len(attrType) > 0)258 }259}260func (s *elasticsearchIntegrationSuite) TestListWorkflow_PageToken() {261 id := "es-integration-list-workflow-token-test"262 wt := "es-integration-list-workflow-token-test-type"263 tl := "es-integration-list-workflow-token-test-taskqueue"264 request := s.createStartWorkflowExecutionRequest(id, wt, tl)265 numOfWorkflows := defaultTestValueOfESIndexMaxResultWindow - 1 // == 4266 pageSize := 3267 s.testListWorkflowHelper(numOfWorkflows, pageSize, request, id, wt, false)268}269func (s *elasticsearchIntegrationSuite) TestListWorkflow_SearchAfter() {270 id := "es-integration-list-workflow-searchAfter-test"271 wt := "es-integration-list-workflow-searchAfter-test-type"272 tl := "es-integration-list-workflow-searchAfter-test-taskqueue"273 request := s.createStartWorkflowExecutionRequest(id, wt, tl)274 numOfWorkflows := defaultTestValueOfESIndexMaxResultWindow + 1 // == 6275 pageSize := 4276 s.testListWorkflowHelper(numOfWorkflows, pageSize, request, id, wt, false)277}278func (s *elasticsearchIntegrationSuite) TestListWorkflow_OrQuery() {279 id := "es-integration-list-workflow-or-query-test"280 wt := "es-integration-list-workflow-or-query-test-type"281 tl := "es-integration-list-workflow-or-query-test-taskqueue"282 request := s.createStartWorkflowExecutionRequest(id, wt, tl)283 // start 3 workflows284 key := "CustomIntField"285 attrValBytes, _ := payload.Encode(1)286 searchAttr := &commonpb.SearchAttributes{287 IndexedFields: map[string]*commonpb.Payload{288 key: attrValBytes,289 },290 }291 request.SearchAttributes = searchAttr292 we1, err := s.engine.StartWorkflowExecution(NewContext(), request)293 s.NoError(err)294 request.RequestId = uuid.New()295 request.WorkflowId = id + "-2"296 attrValBytes, _ = payload.Encode(2)297 searchAttr.IndexedFields[key] = attrValBytes298 we2, err := s.engine.StartWorkflowExecution(NewContext(), request)299 s.NoError(err)300 request.RequestId = uuid.New()301 request.WorkflowId = id + "-3"302 attrValBytes, _ = payload.Encode(3)303 searchAttr.IndexedFields[key] = attrValBytes304 we3, err := s.engine.StartWorkflowExecution(NewContext(), request)305 s.NoError(err)306 time.Sleep(waitForESToSettle)307 // query 1 workflow with search attr308 query1 := fmt.Sprintf(`CustomIntField = %d`, 1)309 var openExecution *workflowpb.WorkflowExecutionInfo310 listRequest := &workflowservice.ListWorkflowExecutionsRequest{311 Namespace: s.namespace,312 PageSize: defaultTestValueOfESIndexMaxResultWindow,313 Query: query1,314 }315 for i := 0; i < numOfRetry; i++ {316 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)317 s.NoError(err)318 if len(resp.GetExecutions()) == 1 {319 openExecution = resp.GetExecutions()[0]320 break321 }322 time.Sleep(waitTimeInMs * time.Millisecond)323 }324 s.NotNil(openExecution)325 s.Equal(we1.GetRunId(), openExecution.GetExecution().GetRunId())326 s.True(!openExecution.GetExecutionTime().Before(*openExecution.GetStartTime()))327 searchValBytes := openExecution.SearchAttributes.GetIndexedFields()[key]328 var searchVal int329 payload.Decode(searchValBytes, &searchVal)330 s.Equal(1, searchVal)331 // query with or clause332 query2 := fmt.Sprintf(`CustomIntField = %d or CustomIntField = %d`, 1, 2)333 listRequest.Query = query2334 var openExecutions []*workflowpb.WorkflowExecutionInfo335 for i := 0; i < numOfRetry; i++ {336 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)337 s.NoError(err)338 if len(resp.GetExecutions()) == 2 {339 openExecutions = resp.GetExecutions()340 break341 }342 time.Sleep(waitTimeInMs * time.Millisecond)343 }344 s.Equal(2, len(openExecutions))345 e1 := openExecutions[0]346 e2 := openExecutions[1]347 if e1.GetExecution().GetRunId() != we1.GetRunId() {348 // results are sorted by [CloseTime,RunID] desc, so find the correct mapping first349 e1, e2 = e2, e1350 }351 s.Equal(we1.GetRunId(), e1.GetExecution().GetRunId())352 s.Equal(we2.GetRunId(), e2.GetExecution().GetRunId())353 searchValBytes = e2.SearchAttributes.GetIndexedFields()[key]354 payload.Decode(searchValBytes, &searchVal)355 s.Equal(2, searchVal)356 // query for open357 query3 := fmt.Sprintf(`(CustomIntField = %d or CustomIntField = %d) and ExecutionStatus = 'Running'`, 2, 3)358 listRequest.Query = query3359 for i := 0; i < numOfRetry; i++ {360 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)361 s.NoError(err)362 if len(resp.GetExecutions()) == 2 {363 openExecutions = resp.GetExecutions()364 break365 }366 time.Sleep(waitTimeInMs * time.Millisecond)367 }368 s.Equal(2, len(openExecutions))369 e1 = openExecutions[0]370 e2 = openExecutions[1]371 s.Equal(we3.GetRunId(), e1.GetExecution().GetRunId())372 s.Equal(we2.GetRunId(), e2.GetExecution().GetRunId())373 searchValBytes = e1.SearchAttributes.GetIndexedFields()[key]374 payload.Decode(searchValBytes, &searchVal)375 s.Equal(3, searchVal)376}377func (s *elasticsearchIntegrationSuite) TestListWorkflow_KeywordQuery() {378 id := "es-integration-list-workflow-keyword-query-test"379 wt := "es-integration-list-workflow-keyword-query-test-type"380 tl := "es-integration-list-workflow-keyword-query-test-taskqueue"381 request := s.createStartWorkflowExecutionRequest(id, wt, tl)382 searchAttr := &commonpb.SearchAttributes{383 IndexedFields: map[string]*commonpb.Payload{384 "CustomKeywordField": payload.EncodeString("justice for all"),385 },386 }387 request.SearchAttributes = searchAttr388 we1, err := s.engine.StartWorkflowExecution(NewContext(), request)389 s.NoError(err)390 time.Sleep(waitForESToSettle)391 // Exact match Keyword (supported)392 var openExecution *workflowpb.WorkflowExecutionInfo393 listRequest := &workflowservice.ListWorkflowExecutionsRequest{394 Namespace: s.namespace,395 PageSize: defaultTestValueOfESIndexMaxResultWindow,396 Query: `CustomKeywordField = "justice for all"`,397 }398 for i := 0; i < numOfRetry; i++ {399 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)400 s.NoError(err)401 if len(resp.GetExecutions()) == 1 {402 openExecution = resp.GetExecutions()[0]403 break404 }405 time.Sleep(waitTimeInMs * time.Millisecond)406 }407 s.NotNil(openExecution)408 s.Equal(we1.GetRunId(), openExecution.GetExecution().GetRunId())409 s.True(!openExecution.GetExecutionTime().Before(*openExecution.GetStartTime()))410 saPayload := openExecution.SearchAttributes.GetIndexedFields()["CustomKeywordField"]411 var saValue string412 err = payload.Decode(saPayload, &saValue)413 s.NoError(err)414 s.Equal("justice for all", saValue)415 // Partial match on Keyword (not supported)416 listRequest = &workflowservice.ListWorkflowExecutionsRequest{417 Namespace: s.namespace,418 PageSize: defaultTestValueOfESIndexMaxResultWindow,419 Query: `CustomKeywordField = "justice"`,420 }421 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)422 s.NoError(err)423 s.Len(resp.GetExecutions(), 0)424 // Inordered match on Keyword (not supported)425 listRequest = &workflowservice.ListWorkflowExecutionsRequest{426 Namespace: s.namespace,427 PageSize: defaultTestValueOfESIndexMaxResultWindow,428 Query: `CustomKeywordField = "all for justice"`,429 }430 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)431 s.NoError(err)432 s.Len(resp.GetExecutions(), 0)433 // LIKE exact match on Keyword (supported)434 listRequest = &workflowservice.ListWorkflowExecutionsRequest{435 Namespace: s.namespace,436 PageSize: defaultTestValueOfESIndexMaxResultWindow,437 Query: `CustomKeywordField LIKE "%justice for all%"`,438 }439 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)440 s.NoError(err)441 s.Len(resp.GetExecutions(), 1)442 // LIKE %word% on Keyword (not supported)443 listRequest = &workflowservice.ListWorkflowExecutionsRequest{444 Namespace: s.namespace,445 PageSize: defaultTestValueOfESIndexMaxResultWindow,446 Query: `CustomKeywordField LIKE "%justice%"`,447 }448 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)449 s.NoError(err)450 s.Len(resp.GetExecutions(), 0)451 // LIKE %chars% on Keyword (not supported)452 listRequest = &workflowservice.ListWorkflowExecutionsRequest{453 Namespace: s.namespace,454 PageSize: defaultTestValueOfESIndexMaxResultWindow,455 Query: `CustomKeywordField LIKE "%ice%"`,456 }457 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)458 s.NoError(err)459 s.Len(resp.GetExecutions(), 0)460 // LIKE NOT %chars% on Keyword (not supported)461 listRequest = &workflowservice.ListWorkflowExecutionsRequest{462 Namespace: s.namespace,463 PageSize: defaultTestValueOfESIndexMaxResultWindow,464 Query: `CustomKeywordField NOT LIKE "%ice%"`,465 }466 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)467 s.NoError(err)468 executionCount := 0469 for _, execution := range resp.GetExecutions() {470 saPayload := execution.SearchAttributes.GetIndexedFields()["CustomKeywordField"]471 var saValue string472 err = payload.Decode(saPayload, &saValue)473 s.NoError(err)474 if strings.Contains(saValue, "ice") {475 executionCount++ // execution will be found because NOT LIKE is not supported.476 }477 }478 s.Equal(executionCount, 1)479}480func (s *elasticsearchIntegrationSuite) TestListWorkflow_StringQuery() {481 id := "es-integration-list-workflow-string-query-test"482 wt := "es-integration-list-workflow-string-query-test-type"483 tl := "es-integration-list-workflow-string-query-test-taskqueue"484 request := s.createStartWorkflowExecutionRequest(id, wt, tl)485 searchAttr := &commonpb.SearchAttributes{486 IndexedFields: map[string]*commonpb.Payload{487 "CustomTextField": payload.EncodeString("nothing else matters"),488 },489 }490 request.SearchAttributes = searchAttr491 we1, err := s.engine.StartWorkflowExecution(NewContext(), request)492 s.NoError(err)493 time.Sleep(waitForESToSettle)494 // Exact match String (supported)495 var openExecution *workflowpb.WorkflowExecutionInfo496 listRequest := &workflowservice.ListWorkflowExecutionsRequest{497 Namespace: s.namespace,498 PageSize: defaultTestValueOfESIndexMaxResultWindow,499 Query: `CustomTextField = "nothing else matters"`,500 }501 for i := 0; i < numOfRetry; i++ {502 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)503 s.NoError(err)504 if len(resp.GetExecutions()) == 1 {505 openExecution = resp.GetExecutions()[0]506 break507 }508 time.Sleep(waitTimeInMs * time.Millisecond)509 }510 s.NotNil(openExecution)511 s.Equal(we1.GetRunId(), openExecution.GetExecution().GetRunId())512 s.True(!openExecution.GetExecutionTime().Before(*openExecution.GetStartTime()))513 saPayload := openExecution.SearchAttributes.GetIndexedFields()["CustomTextField"]514 var saValue string515 err = payload.Decode(saPayload, &saValue)516 s.NoError(err)517 s.Equal("nothing else matters", saValue)518 // Partial match on String (supported)519 listRequest = &workflowservice.ListWorkflowExecutionsRequest{520 Namespace: s.namespace,521 PageSize: defaultTestValueOfESIndexMaxResultWindow,522 Query: `CustomTextField = "nothing"`,523 }524 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)525 s.NoError(err)526 s.Len(resp.GetExecutions(), 1)527 // Inordered match on String (supported)528 listRequest = &workflowservice.ListWorkflowExecutionsRequest{529 Namespace: s.namespace,530 PageSize: defaultTestValueOfESIndexMaxResultWindow,531 Query: `CustomTextField = "else nothing matters"`,532 }533 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)534 s.NoError(err)535 s.Len(resp.GetExecutions(), 1)536 // LIKE %word% on String (supported)537 listRequest = &workflowservice.ListWorkflowExecutionsRequest{538 Namespace: s.namespace,539 PageSize: defaultTestValueOfESIndexMaxResultWindow,540 Query: `CustomTextField LIKE "%else%"`,541 }542 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)543 s.NoError(err)544 s.Len(resp.GetExecutions(), 1)545 // LIKE word on String (supported)546 listRequest = &workflowservice.ListWorkflowExecutionsRequest{547 Namespace: s.namespace,548 PageSize: defaultTestValueOfESIndexMaxResultWindow,549 Query: `CustomTextField LIKE "else"`, // Same as previous because % just removed for LIKE queries.550 }551 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)552 s.NoError(err)553 s.Len(resp.GetExecutions(), 1)554 // LIKE %chars% on String (not supported)555 listRequest = &workflowservice.ListWorkflowExecutionsRequest{556 Namespace: s.namespace,557 PageSize: defaultTestValueOfESIndexMaxResultWindow,558 Query: `CustomTextField LIKE "%ls%"`,559 }560 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)561 s.NoError(err)562 s.Len(resp.GetExecutions(), 0)563 // LIKE NOT %word% on String (supported)564 listRequest = &workflowservice.ListWorkflowExecutionsRequest{565 Namespace: s.namespace,566 PageSize: defaultTestValueOfESIndexMaxResultWindow,567 Query: `CustomTextField NOT LIKE "%else%"`,568 }569 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)570 s.NoError(err)571 executionCount := 0572 for _, execution := range resp.GetExecutions() {573 saPayload := execution.SearchAttributes.GetIndexedFields()["CustomTextField"]574 var saValue string575 err = payload.Decode(saPayload, &saValue)576 s.NoError(err)577 if strings.Contains(saValue, "else") {578 executionCount++579 }580 }581 s.Equal(executionCount, 0)582}583// To test last page search trigger max window size error584func (s *elasticsearchIntegrationSuite) TestListWorkflow_MaxWindowSize() {585 id := "es-integration-list-workflow-max-window-size-test"586 wt := "es-integration-list-workflow-max-window-size-test-type"587 tl := "es-integration-list-workflow-max-window-size-test-taskqueue"588 startRequest := s.createStartWorkflowExecutionRequest(id, wt, tl)589 for i := 0; i < defaultTestValueOfESIndexMaxResultWindow; i++ {590 startRequest.RequestId = uuid.New()591 startRequest.WorkflowId = id + strconv.Itoa(i)592 _, err := s.engine.StartWorkflowExecution(NewContext(), startRequest)593 s.NoError(err)594 }595 time.Sleep(waitForESToSettle)596 var listResp *workflowservice.ListWorkflowExecutionsResponse597 var nextPageToken []byte598 listRequest := &workflowservice.ListWorkflowExecutionsRequest{599 Namespace: s.namespace,600 PageSize: int32(defaultTestValueOfESIndexMaxResultWindow),601 NextPageToken: nextPageToken,602 Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = "Running"`, wt),603 }604 // get first page605 for i := 0; i < numOfRetry; i++ {606 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)607 s.NoError(err)608 if len(resp.GetExecutions()) == defaultTestValueOfESIndexMaxResultWindow {609 listResp = resp610 break611 }612 time.Sleep(waitTimeInMs * time.Millisecond)613 }614 s.NotNil(listResp)615 s.True(len(listResp.GetNextPageToken()) != 0)616 // the last request617 listRequest.NextPageToken = listResp.GetNextPageToken()618 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)619 s.NoError(err)620 s.True(len(resp.GetExecutions()) == 0)621 s.Nil(resp.GetNextPageToken())622}623func (s *elasticsearchIntegrationSuite) TestListWorkflow_OrderBy() {624 id := "es-integration-list-workflow-order-by-test"625 wt := "es-integration-list-workflow-order-by-test-type"626 tl := "es-integration-list-workflow-order-by-test-taskqueue"627 startRequest := s.createStartWorkflowExecutionRequest(id, wt, tl)628 for i := 0; i < defaultTestValueOfESIndexMaxResultWindow+1; i++ { // start 6629 startRequest.RequestId = uuid.New()630 startRequest.WorkflowId = id + strconv.Itoa(i)631 if i < defaultTestValueOfESIndexMaxResultWindow-1 { // 4 workflow has search attr632 intVal, _ := payload.Encode(i)633 doubleVal, _ := payload.Encode(float64(i))634 strVal, _ := payload.Encode(strconv.Itoa(i))635 timeVal, _ := payload.Encode(time.Now().UTC())636 searchAttr := &commonpb.SearchAttributes{637 IndexedFields: map[string]*commonpb.Payload{638 "CustomIntField": intVal,639 "CustomDoubleField": doubleVal,640 "CustomKeywordField": strVal,641 "CustomDatetimeField": timeVal,642 },643 }644 startRequest.SearchAttributes = searchAttr645 } else {646 startRequest.SearchAttributes = &commonpb.SearchAttributes{}647 }648 _, err := s.engine.StartWorkflowExecution(NewContext(), startRequest)649 s.NoError(err)650 }651 time.Sleep(waitForESToSettle)652 desc := "desc"653 asc := "asc"654 queryTemplate := `WorkflowType = "%s" order by %s %s`655 pageSize := int32(defaultTestValueOfESIndexMaxResultWindow)656 // order by CloseTime asc657 query1 := fmt.Sprintf(queryTemplate, wt, searchattribute.CloseTime, asc)658 var openExecutions []*workflowpb.WorkflowExecutionInfo659 listRequest := &workflowservice.ListWorkflowExecutionsRequest{660 Namespace: s.namespace,661 PageSize: pageSize,662 Query: query1,663 }664 for i := 0; i < numOfRetry; i++ {665 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)666 s.NoError(err)667 if int32(len(resp.GetExecutions())) == listRequest.GetPageSize() {668 openExecutions = resp.GetExecutions()669 break670 }671 time.Sleep(waitTimeInMs * time.Millisecond)672 }673 s.NotNil(openExecutions)674 for i := int32(1); i < pageSize; i++ {675 e1 := openExecutions[i-1]676 e2 := openExecutions[i]677 if e2.GetCloseTime() != nil {678 s.NotEqual(time.Time{}, *e1.GetCloseTime())679 s.GreaterOrEqual(e2.GetCloseTime(), e1.GetCloseTime())680 }681 }682 // greatest effort to reduce duplicate code683 testHelper := func(query, searchAttrKey string, prevVal, currVal interface{}) {684 listRequest.Query = query685 listRequest.NextPageToken = []byte{}686 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)687 s.NoError(err)688 openExecutions = resp.GetExecutions()689 dec := json.NewDecoder(bytes.NewReader(openExecutions[0].GetSearchAttributes().GetIndexedFields()[searchAttrKey].GetData()))690 dec.UseNumber()691 err = dec.Decode(&prevVal)692 s.NoError(err)693 for i := int32(1); i < pageSize; i++ {694 indexedFields := openExecutions[i].GetSearchAttributes().GetIndexedFields()695 searchAttrBytes, ok := indexedFields[searchAttrKey]696 if !ok { // last one doesn't have search attr697 s.Equal(pageSize-1, i)698 break699 }700 dec := json.NewDecoder(bytes.NewReader(searchAttrBytes.GetData()))701 dec.UseNumber()702 err = dec.Decode(&currVal)703 s.NoError(err)704 var v1, v2 interface{}705 switch searchAttrKey {706 case "CustomIntField":707 v1, _ = prevVal.(json.Number).Int64()708 v2, _ = currVal.(json.Number).Int64()709 s.True(v1.(int64) >= v2.(int64))710 case "CustomDoubleField":711 v1, _ = prevVal.(json.Number).Float64()712 v2, _ = currVal.(json.Number).Float64()713 s.True(v1.(float64) >= v2.(float64))714 case "CustomKeywordField":715 s.True(prevVal.(string) >= currVal.(string))716 case "CustomDatetimeField":717 v1, _ = time.Parse(time.RFC3339Nano, prevVal.(string))718 v2, _ = time.Parse(time.RFC3339Nano, currVal.(string))719 s.True(v1.(time.Time).After(v2.(time.Time)))720 }721 prevVal = currVal722 }723 listRequest.NextPageToken = resp.GetNextPageToken()724 resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest) // last page725 s.NoError(err)726 s.Equal(1, len(resp.GetExecutions()))727 }728 // order by CustomIntField desc729 field := "CustomIntField"730 query := fmt.Sprintf(queryTemplate, wt, field, desc)731 var int1, int2 int732 testHelper(query, field, int1, int2)733 // order by CustomDoubleField desc734 field = "CustomDoubleField"735 query = fmt.Sprintf(queryTemplate, wt, field, desc)736 var double1, double2 float64737 testHelper(query, field, double1, double2)738 // order by CustomKeywordField desc739 field = "CustomKeywordField"740 query = fmt.Sprintf(queryTemplate, wt, field, desc)741 var s1, s2 string742 testHelper(query, field, s1, s2)743 // order by CustomDatetimeField desc744 field = "CustomDatetimeField"745 query = fmt.Sprintf(queryTemplate, wt, field, desc)746 var t1, t2 time.Time747 testHelper(query, field, t1, t2)748}749func (s *elasticsearchIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize int,750 startRequest *workflowservice.StartWorkflowExecutionRequest, wid, wType string, isScan bool) {751 // start enough number of workflows752 for i := 0; i < numOfWorkflows; i++ {753 startRequest.RequestId = uuid.New()754 startRequest.WorkflowId = wid + strconv.Itoa(i)755 _, err := s.engine.StartWorkflowExecution(NewContext(), startRequest)756 s.NoError(err)757 }758 time.Sleep(waitForESToSettle)759 var openExecutions []*workflowpb.WorkflowExecutionInfo760 var nextPageToken []byte761 listRequest := &workflowservice.ListWorkflowExecutionsRequest{762 Namespace: s.namespace,763 PageSize: int32(pageSize),764 NextPageToken: nextPageToken,765 Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Running'`, wType),766 }767 scanRequest := &workflowservice.ScanWorkflowExecutionsRequest{768 Namespace: s.namespace,769 PageSize: int32(pageSize),770 NextPageToken: nextPageToken,771 Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Running'`, wType),772 }773 // test first page774 for i := 0; i < numOfRetry; i++ {775 if isScan {776 scanResponse, err := s.engine.ScanWorkflowExecutions(NewContext(), scanRequest)777 s.NoError(err)778 if len(scanResponse.GetExecutions()) == pageSize {779 openExecutions = scanResponse.GetExecutions()780 nextPageToken = scanResponse.GetNextPageToken()781 break782 }783 } else {784 listResponse, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)785 s.NoError(err)786 if len(listResponse.GetExecutions()) == pageSize {787 openExecutions = listResponse.GetExecutions()788 nextPageToken = listResponse.GetNextPageToken()789 break790 }791 }792 time.Sleep(waitTimeInMs * time.Millisecond)793 }794 s.NotNil(openExecutions)795 s.NotNil(nextPageToken)796 s.True(len(nextPageToken) > 0)797 // test last page798 listRequest.NextPageToken = nextPageToken799 scanRequest.NextPageToken = nextPageToken800 inIf := false801 for i := 0; i < numOfRetry; i++ {802 if isScan {803 scanResponse, err := s.engine.ScanWorkflowExecutions(NewContext(), scanRequest)804 s.NoError(err)805 if len(scanResponse.GetExecutions()) == numOfWorkflows-pageSize {806 inIf = true807 openExecutions = scanResponse.GetExecutions()808 nextPageToken = scanResponse.GetNextPageToken()809 break810 }811 } else {812 listResponse, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)813 s.NoError(err)814 if len(listResponse.GetExecutions()) == numOfWorkflows-pageSize {815 inIf = true816 openExecutions = listResponse.GetExecutions()817 nextPageToken = listResponse.GetNextPageToken()818 break819 }820 }821 time.Sleep(waitTimeInMs * time.Millisecond)822 }823 s.True(inIf)824 s.NotNil(openExecutions)825 s.Nil(nextPageToken)826}827func (s *elasticsearchIntegrationSuite) testHelperForReadOnce(expectedRunID string, query string, isScan bool) {828 var openExecution *workflowpb.WorkflowExecutionInfo829 listRequest := &workflowservice.ListWorkflowExecutionsRequest{830 Namespace: s.namespace,831 PageSize: defaultTestValueOfESIndexMaxResultWindow,832 Query: query,833 }834 scanRequest := &workflowservice.ScanWorkflowExecutionsRequest{835 Namespace: s.namespace,836 PageSize: defaultTestValueOfESIndexMaxResultWindow,837 Query: query,838 }839 for i := 0; i < numOfRetry; i++ {840 if isScan {841 scanResponse, err := s.engine.ScanWorkflowExecutions(NewContext(), scanRequest)842 s.NoError(err)843 if len(scanResponse.GetExecutions()) == 1 {844 openExecution = scanResponse.GetExecutions()[0]845 s.Nil(scanResponse.NextPageToken)846 break847 }848 } else {849 listResponse, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)850 s.NoError(err)851 if len(listResponse.GetExecutions()) == 1 {852 openExecution = listResponse.GetExecutions()[0]853 s.Nil(listResponse.NextPageToken)854 break855 }856 }857 time.Sleep(waitTimeInMs * time.Millisecond)858 }859 s.NotNil(openExecution)860 s.Equal(expectedRunID, openExecution.GetExecution().GetRunId())861 s.True(!openExecution.GetExecutionTime().Before(*openExecution.GetStartTime()))862 if openExecution.SearchAttributes != nil && len(openExecution.SearchAttributes.GetIndexedFields()) > 0 {863 searchValBytes := openExecution.SearchAttributes.GetIndexedFields()[s.testSearchAttributeKey]864 var searchVal string865 payload.Decode(searchValBytes, &searchVal)866 s.Equal(s.testSearchAttributeVal, searchVal)867 }868}869func (s *elasticsearchIntegrationSuite) TestScanWorkflow() {870 id := "es-integration-scan-workflow-test"871 wt := "es-integration-scan-workflow-test-type"872 tl := "es-integration-scan-workflow-test-taskqueue"873 identity := "worker1"874 workflowType := &commonpb.WorkflowType{Name: wt}875 taskQueue := &taskqueuepb.TaskQueue{Name: tl}876 request := &workflowservice.StartWorkflowExecutionRequest{877 RequestId: uuid.New(),878 Namespace: s.namespace,879 WorkflowId: id,880 WorkflowType: workflowType,881 TaskQueue: taskQueue,882 Input: nil,883 WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),884 WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),885 Identity: identity,886 }887 we, err := s.engine.StartWorkflowExecution(NewContext(), request)888 s.NoError(err)889 query := fmt.Sprintf(`WorkflowId = "%s"`, id)890 s.testHelperForReadOnce(we.GetRunId(), query, true)891}892func (s *elasticsearchIntegrationSuite) TestScanWorkflow_SearchAttribute() {893 id := "es-integration-scan-workflow-search-attr-test"894 wt := "es-integration-scan-workflow-search-attr-test-type"895 tl := "es-integration-scan-workflow-search-attr-test-taskqueue"896 request := s.createStartWorkflowExecutionRequest(id, wt, tl)897 attrValBytes, _ := payload.Encode(s.testSearchAttributeVal)898 searchAttr := &commonpb.SearchAttributes{899 IndexedFields: map[string]*commonpb.Payload{900 s.testSearchAttributeKey: attrValBytes,901 },902 }903 request.SearchAttributes = searchAttr904 we, err := s.engine.StartWorkflowExecution(NewContext(), request)905 s.NoError(err)906 query := fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)907 s.testHelperForReadOnce(we.GetRunId(), query, true)908}909func (s *elasticsearchIntegrationSuite) TestScanWorkflow_PageToken() {910 id := "es-integration-scan-workflow-token-test"911 wt := "es-integration-scan-workflow-token-test-type"912 tl := "es-integration-scan-workflow-token-test-taskqueue"913 identity := "worker1"914 workflowType := &commonpb.WorkflowType{Name: wt}915 taskQueue := &taskqueuepb.TaskQueue{Name: tl}916 request := &workflowservice.StartWorkflowExecutionRequest{917 Namespace: s.namespace,918 WorkflowType: workflowType,919 TaskQueue: taskQueue,920 Input: nil,921 WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),922 WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),923 Identity: identity,924 }925 numOfWorkflows := 4926 pageSize := 3927 s.testListWorkflowHelper(numOfWorkflows, pageSize, request, id, wt, true)928}929func (s *elasticsearchIntegrationSuite) TestCountWorkflow() {930 id := "es-integration-count-workflow-test"931 wt := "es-integration-count-workflow-test-type"932 tl := "es-integration-count-workflow-test-taskqueue"933 request := s.createStartWorkflowExecutionRequest(id, wt, tl)934 attrValBytes, _ := payload.Encode(s.testSearchAttributeVal)935 searchAttr := &commonpb.SearchAttributes{936 IndexedFields: map[string]*commonpb.Payload{937 s.testSearchAttributeKey: attrValBytes,938 },939 }940 request.SearchAttributes = searchAttr941 _, err := s.engine.StartWorkflowExecution(NewContext(), request)942 s.NoError(err)943 query := fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)944 countRequest := &workflowservice.CountWorkflowExecutionsRequest{945 Namespace: s.namespace,946 Query: query,947 }948 var resp *workflowservice.CountWorkflowExecutionsResponse949 for i := 0; i < numOfRetry; i++ {950 resp, err = s.engine.CountWorkflowExecutions(NewContext(), countRequest)951 s.NoError(err)952 if resp.GetCount() == int64(1) {953 break954 }955 time.Sleep(waitTimeInMs * time.Millisecond)956 }957 s.Equal(int64(1), resp.GetCount())958 query = fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, "noMatch")959 countRequest.Query = query960 resp, err = s.engine.CountWorkflowExecutions(NewContext(), countRequest)961 s.NoError(err)962 s.Equal(int64(0), resp.GetCount())963}964func (s *elasticsearchIntegrationSuite) createStartWorkflowExecutionRequest(id, wt, tl string) *workflowservice.StartWorkflowExecutionRequest {965 identity := "worker1"966 workflowType := &commonpb.WorkflowType{Name: wt}967 taskQueue := &taskqueuepb.TaskQueue{Name: tl}968 request := &workflowservice.StartWorkflowExecutionRequest{969 RequestId: uuid.New(),970 Namespace: s.namespace,971 WorkflowId: id,972 WorkflowType: workflowType,973 TaskQueue: taskQueue,974 Input: nil,975 WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),976 WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),977 Identity: identity,978 }979 return request980}981func (s *elasticsearchIntegrationSuite) TestUpsertWorkflowExecution() {982 id := "es-integration-upsert-workflow-test"983 wt := "es-integration-upsert-workflow-test-type"984 tl := "es-integration-upsert-workflow-test-taskqueue"985 identity := "worker1"986 workflowType := &commonpb.WorkflowType{Name: wt}987 taskQueue := &taskqueuepb.TaskQueue{Name: tl}988 request := &workflowservice.StartWorkflowExecutionRequest{989 RequestId: uuid.New(),990 Namespace: s.namespace,991 WorkflowId: id,992 WorkflowType: workflowType,993 TaskQueue: taskQueue,994 Input: nil,995 WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),996 WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),997 Identity: identity,998 }999 we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)1000 s.NoError(err0)1001 s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))1002 commandCount := 01003 wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,1004 previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {1005 upsertCommand := &commandpb.Command{1006 CommandType: enumspb.COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES,1007 Attributes: &commandpb.Command_UpsertWorkflowSearchAttributesCommandAttributes{UpsertWorkflowSearchAttributesCommandAttributes: &commandpb.UpsertWorkflowSearchAttributesCommandAttributes{}}}1008 // handle first upsert1009 if commandCount == 0 {1010 commandCount++1011 attrValPayload, _ := payload.Encode(s.testSearchAttributeVal)1012 upsertSearchAttr := &commonpb.SearchAttributes{1013 IndexedFields: map[string]*commonpb.Payload{1014 s.testSearchAttributeKey: attrValPayload,1015 },1016 }1017 upsertCommand.GetUpsertWorkflowSearchAttributesCommandAttributes().SearchAttributes = upsertSearchAttr1018 return []*commandpb.Command{upsertCommand}, nil1019 }1020 // handle second upsert, which update existing field and add new field1021 if commandCount == 1 {1022 commandCount++1023 upsertCommand.GetUpsertWorkflowSearchAttributesCommandAttributes().SearchAttributes = s.createSearchAttributes()1024 return []*commandpb.Command{upsertCommand}, nil1025 }1026 return []*commandpb.Command{{1027 CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,1028 Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{1029 Result: payloads.EncodeString("Done"),1030 }},1031 }}, nil1032 }1033 poller := &TaskPoller{1034 Engine: s.engine,1035 Namespace: s.namespace,1036 TaskQueue: taskQueue,1037 StickyTaskQueue: taskQueue,1038 Identity: identity,1039 WorkflowTaskHandler: wtHandler,1040 Logger: s.Logger,1041 T: s.T(),1042 }1043 // process 1st workflow task and assert workflow task is handled correctly.1044 _, newTask, err := poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(1045 false,1046 false,1047 true,1048 true,1049 0,1050 1,1051 true,1052 nil)1053 s.NoError(err)1054 s.NotNil(newTask)1055 s.NotNil(newTask.WorkflowTask)1056 s.Equal(int64(3), newTask.WorkflowTask.GetPreviousStartedEventId())1057 s.Equal(int64(7), newTask.WorkflowTask.GetStartedEventId())1058 s.Equal(4, len(newTask.WorkflowTask.History.Events))1059 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED, newTask.WorkflowTask.History.Events[0].GetEventType())1060 s.Equal(enumspb.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES, newTask.WorkflowTask.History.Events[1].GetEventType())1061 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, newTask.WorkflowTask.History.Events[2].GetEventType())1062 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, newTask.WorkflowTask.History.Events[3].GetEventType())1063 time.Sleep(waitForESToSettle)1064 // verify upsert data is on ES1065 listRequest := &workflowservice.ListWorkflowExecutionsRequest{1066 Namespace: s.namespace,1067 PageSize: int32(2),1068 Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Running'`, wt),1069 }1070 verified := false1071 for i := 0; i < numOfRetry; i++ {1072 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)1073 s.NoError(err)1074 if len(resp.GetExecutions()) == 1 {1075 execution := resp.GetExecutions()[0]1076 retrievedSearchAttr := execution.SearchAttributes1077 if retrievedSearchAttr != nil && len(retrievedSearchAttr.GetIndexedFields()) > 0 {1078 searchValBytes := retrievedSearchAttr.GetIndexedFields()[s.testSearchAttributeKey]1079 var searchVal string1080 err = payload.Decode(searchValBytes, &searchVal)1081 s.NoError(err)1082 s.Equal(s.testSearchAttributeVal, searchVal)1083 verified = true1084 break1085 }1086 }1087 time.Sleep(waitTimeInMs * time.Millisecond)1088 }1089 s.True(verified)1090 // process 2nd workflow task and assert workflow task is handled correctly.1091 _, newTask, err = poller.PollAndProcessWorkflowTaskWithAttemptAndRetryAndForceNewWorkflowTask(1092 false,1093 false,1094 true,1095 true,1096 0,1097 1,1098 true,1099 nil)1100 s.NoError(err)1101 s.NotNil(newTask)1102 s.NotNil(newTask.WorkflowTask)1103 s.Equal(4, len(newTask.WorkflowTask.History.Events))1104 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED, newTask.WorkflowTask.History.Events[0].GetEventType())1105 s.Equal(enumspb.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES, newTask.WorkflowTask.History.Events[1].GetEventType())1106 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, newTask.WorkflowTask.History.Events[2].GetEventType())1107 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, newTask.WorkflowTask.History.Events[3].GetEventType())1108 time.Sleep(waitForESToSettle)1109 // verify upsert data is on ES1110 s.testListResultForUpsertSearchAttributes(listRequest)1111}1112func (s *elasticsearchIntegrationSuite) testListResultForUpsertSearchAttributes(listRequest *workflowservice.ListWorkflowExecutionsRequest) {1113 verified := false1114 for i := 0; i < numOfRetry; i++ {1115 resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)1116 s.NoError(err)1117 if len(resp.GetExecutions()) == 1 {1118 s.Nil(resp.NextPageToken)1119 execution := resp.GetExecutions()[0]1120 retrievedSearchAttr := execution.SearchAttributes1121 if retrievedSearchAttr != nil && len(retrievedSearchAttr.GetIndexedFields()) == 4 {1122 fields := retrievedSearchAttr.GetIndexedFields()1123 searchValBytes := fields[s.testSearchAttributeKey]1124 var searchVal string1125 err := payload.Decode(searchValBytes, &searchVal)1126 s.NoError(err)1127 s.Equal("another string", searchVal)1128 searchValBytes2 := fields["CustomIntField"]1129 var searchVal2 int1130 err = payload.Decode(searchValBytes2, &searchVal2)1131 s.NoError(err)1132 s.Equal(123, searchVal2)1133 doublePayload := fields["CustomDoubleField"]1134 var doubleVal float641135 err = payload.Decode(doublePayload, &doubleVal)1136 s.NoError(err)1137 s.Equal(22.0878, doubleVal)1138 binaryChecksumsBytes := fields[searchattribute.BinaryChecksums]1139 var binaryChecksums []string1140 err = payload.Decode(binaryChecksumsBytes, &binaryChecksums)1141 s.NoError(err)1142 s.Equal([]string{"binary-v1", "binary-v2"}, binaryChecksums)1143 verified = true1144 break1145 }1146 }1147 time.Sleep(waitTimeInMs * time.Millisecond)1148 }1149 s.True(verified)1150}1151func (s *elasticsearchIntegrationSuite) createSearchAttributes() *commonpb.SearchAttributes {1152 searchAttributes, err := searchattribute.Encode(map[string]interface{}{1153 "CustomTextField": "another string",1154 "CustomIntField": 123,1155 "CustomDoubleField": 22.0878,1156 searchattribute.BinaryChecksums: []string{"binary-v1", "binary-v2"},1157 }, nil)1158 s.NoError(err)1159 return searchAttributes1160}1161func (s *elasticsearchIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey() {1162 id := "es-integration-upsert-workflow-failed-test"1163 wt := "es-integration-upsert-workflow-failed-test-type"1164 tl := "es-integration-upsert-workflow-failed-test-taskqueue"1165 identity := "worker1"1166 workflowType := &commonpb.WorkflowType{Name: wt}1167 taskQueue := &taskqueuepb.TaskQueue{Name: tl}1168 request := &workflowservice.StartWorkflowExecutionRequest{1169 RequestId: uuid.New(),1170 Namespace: s.namespace,1171 WorkflowId: id,1172 WorkflowType: workflowType,1173 TaskQueue: taskQueue,1174 Input: nil,1175 WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),1176 WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),1177 Identity: identity,1178 }1179 we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)1180 s.NoError(err0)1181 s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))1182 wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,1183 previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {1184 upsertCommand := &commandpb.Command{1185 CommandType: enumspb.COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES,1186 Attributes: &commandpb.Command_UpsertWorkflowSearchAttributesCommandAttributes{UpsertWorkflowSearchAttributesCommandAttributes: &commandpb.UpsertWorkflowSearchAttributesCommandAttributes{1187 SearchAttributes: &commonpb.SearchAttributes{1188 IndexedFields: map[string]*commonpb.Payload{1189 "INVALIDKEY": payload.EncodeBytes([]byte("1")),1190 },1191 },1192 }}}1193 return []*commandpb.Command{upsertCommand}, nil1194 }1195 poller := &TaskPoller{1196 Engine: s.engine,1197 Namespace: s.namespace,1198 TaskQueue: taskQueue,1199 StickyTaskQueue: taskQueue,1200 Identity: identity,1201 WorkflowTaskHandler: wtHandler,1202 Logger: s.Logger,1203 T: s.T(),1204 }1205 _, err := poller.PollAndProcessWorkflowTask(false, false)1206 s.Error(err)1207 s.IsType(&serviceerror.InvalidArgument{}, err)1208 s.Equal("BadSearchAttributes: search attribute INVALIDKEY is not defined", err.Error())1209 historyResponse, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{1210 Namespace: s.namespace,1211 Execution: &commonpb.WorkflowExecution{1212 WorkflowId: id,1213 RunId: we.RunId,1214 },1215 })1216 s.NoError(err)1217 history := historyResponse.History1218 workflowTaskFailedEvent := history.GetEvents()[3]1219 s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED, workflowTaskFailedEvent.GetEventType())1220 failedEventAttr := workflowTaskFailedEvent.GetWorkflowTaskFailedEventAttributes()1221 s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES, failedEventAttr.GetCause())1222 s.NotNil(failedEventAttr.GetFailure())1223}1224func (s *elasticsearchIntegrationSuite) Test_LongWorkflowID() {1225 if s.testClusterConfig.Persistence.StoreType == config.StoreTypeSQL {1226 // TODO: remove this when workflow_id field size is increased from varchar(255) in SQL schema.1227 return1228 }1229 id := strings.Repeat("a", 1000)1230 wt := "es-integration-long-workflow-id-test-type"1231 tl := "es-integration-long-workflow-id-test-taskqueue"1232 request := s.createStartWorkflowExecutionRequest(id, wt, tl)1233 we, err := s.engine.StartWorkflowExecution(NewContext(), request)1234 s.NoError(err)1235 query := fmt.Sprintf(`WorkflowId = "%s"`, id)1236 s.testHelperForReadOnce(we.GetRunId(), query, false)1237}1238func (s *elasticsearchIntegrationSuite) putIndexSettings(indexName string, maxResultWindowSize int) {1239 acknowledged, err := s.esClient.IndexPutSettings(1240 context.Background(),1241 indexName,1242 fmt.Sprintf(`{"max_result_window" : %d}`, defaultTestValueOfESIndexMaxResultWindow))1243 s.Require().NoError(err)1244 s.Require().True(acknowledged)1245 s.verifyMaxResultWindowSize(indexName, defaultTestValueOfESIndexMaxResultWindow)1246}1247func (s *elasticsearchIntegrationSuite) verifyMaxResultWindowSize(indexName string, targetSize int) {1248 for i := 0; i < numOfRetry; i++ {1249 settings, err := s.esClient.IndexGetSettings(context.Background(), indexName)1250 s.Require().NoError(err)1251 if settings[indexName].Settings["index"].(map[string]interface{})["max_result_window"].(string) == strconv.Itoa(targetSize) {1252 return1253 }1254 time.Sleep(waitTimeInMs * time.Millisecond)1255 }1256 s.FailNow(fmt.Sprintf("ES max result window size hasn't reach target size within %v", (numOfRetry*waitTimeInMs)*time.Millisecond))1257}...

Full Screen

Full Screen

pool_test.go

Source:pool_test.go Github

copy

Full Screen

...66 stoppedThenStarted := false67 for i := 0; i < 200; i++ {68 if !wasStopped && testPool.GetStatus() == PoolStatusStopped.String() {69 wasStopped = true70 assert.GreaterOrEqual(t, sJob.getExecutions(), 1, "The scheduled job did not run at least once before failure")71 sJob.clearExecutions()72 assert.GreaterOrEqual(t, iJob.getExecutions(), 1, "The interval job did not run at least once before failure")73 iJob.clearExecutions()74 assert.GreaterOrEqual(t, failJob.getExecutions(), 1, "The failing interval did not run at least once before failure")75 failJob.clearExecutions()76 assert.GreaterOrEqual(t, diJob.getExecutions(), 1, "The detached interval job did not run at least once before other jobs were stopped")77 diJob.clearExecutions()78 assert.GreaterOrEqual(t, cJob.getExecutions(), 1, "The channel job did not run at least once before failure")79 cJob.clearExecutions()80 }81 if wasStopped && testPool.GetStatus() == PoolStatusRunning.String() {82 stoppedThenStarted = true83 }84 time.Sleep(10 * time.Millisecond)85 }86 time.Sleep(2 * time.Second) // give enough time for scheduled job to run at least once more87 assert.GreaterOrEqual(t, sJob.getExecutions(), 1, "The scheduled job did not run at least once after failure")88 assert.GreaterOrEqual(t, iJob.getExecutions(), 1, "The interval job did not run at least once after failure")89 assert.GreaterOrEqual(t, failJob.getExecutions(), 1, "The failing interval did not run at least once after failure")90 assert.GreaterOrEqual(t, diJob.getExecutions(), 1, "The detached interval did not run at least once after failure")91 assert.GreaterOrEqual(t, cJob.getExecutions(), 1, "The channel did not run at least once after failure")92 // just can't get these 2 to ever pass if tests are run with -race flag93 // assert.True(t, wasStopped, "The pool status never showed as stopped")94 // assert.True(t, stoppedThenStarted, "The pool status never restarted after it was stopped")95 // add this dummy statement that is needed if the asserts are commented out96 if stoppedThenStarted {97 }98 assert.True(t, failJob.getWasFailed(), "The fail job never reported as failed")99 assert.True(t, failJob.getWasRestored(), "The fail job was not restored after failure")100}...

Full Screen

Full Screen

execution.go

Source:execution.go Github

copy

Full Screen

...4 "net/http"5 "github.com/bmf-san/oilking/app/model"6)7const (8 // getExecutionsPath is a path for getexecutions endpoint.9 getExecutionsPath = "/me/getexecutions"10)11// GetExecutions gets executions.12func (c *Client) GetExecutions(executionParams *model.ExecutionParams) (*model.ExecutionResponse, error) {13 body, err := c.Do(http.MethodGet, getExecutionsPath, executionParams.MakeExecutionParams(), nil)14 if err != nil {15 return nil, err16 }17 var e model.ExecutionResponse18 if err = json.Unmarshal(body, &e); err != nil {19 return nil, err20 }21 return &e, nil22}...

Full Screen

Full Screen

getExecutions

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 fmt.Println(executions.GetExecutions())4}5import (6func main() {7 fmt.Println(executions.GetExecutions())8}9func GetExecutions() string {10}11func GetExecutions() string {12}13func GetExecutions() string {14}15func GetExecutions() string {16}17func GetExecutions() string {18}19func GetExecutions() string {20}21func GetExecutions() string {22}23func GetExecutions() string {24}25func GetExecutions() string {26}27func GetExecutions() string {28}29func GetExecutions() string {30}

Full Screen

Full Screen

getExecutions

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 executions.getExecutions()4}5import (6func getExecutions() {7 fmt.Println("Hello, world.")8}

Full Screen

Full Screen

getExecutions

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 start := time.Now()4 executions := getExecutions()5 fmt.Println("Executions: ", executions)6 fmt.Println("Executions: ", pretty.Sprint(executions))7 fmt.Println("Executions: ", pretty.Sprintf("%# v", executions))8 elapsed := time.Since(start)9 log.Printf("getExecutions took %s", elapsed)10}11import (12type Executions struct {13}14type Execution struct {

Full Screen

Full Screen

getExecutions

Using AI Code Generation

copy

Full Screen

1import (2var (3func main() {4 msgChan = make(chan string)5 errChan = make(chan error)6 vm = otto.New()7 vm.Set("getExecutions", getExecutions)8 go runScript()9 for {10 select {11 fmt.Println(msg)12 log.Fatal(err)13 }14 }15}16func runScript() {17 script, err := os.Open("script.js")18 if err != nil {19 }20 _, err = vm.Run(script)21 if err != nil {22 }23}24func getExecutions(call otto.FunctionCall) otto.Value {25 if len(args) != 1 {26 errChan <- fmt.Errorf("getExecutions: invalid number of arguments")27 return otto.NullValue()28 }29 if !args[0].IsString() {30 errChan <- fmt.Errorf("getExecutions: invalid argument type")31 return otto.NullValue()32 }33 funcName, err := args[0].ToString()34 if err != nil {35 return otto.NullValue()36 }37 funcVal, err := vm.Get("global." + funcName)38 if err != nil {39 return otto.NullValue()40 }41 if !funcVal.IsFunction() {42 errChan <- fmt.Errorf("getExecutions: %s is not a function", funcName)43 return otto.NullValue()44 }45 funcObj, err := funcVal.Object()46 if err != nil {

Full Screen

Full Screen

getExecutions

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 execution := aws.Executions{}4 fmt.Println(execution.GetExecutions())5}6import (7func main() {8 execution := aws.Executions{}9 fmt.Println(execution.GetExecutions())10}11import (12func main() {13 execution := aws.Executions{}14 fmt.Println(execution.GetExecutions())15}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testkube automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful