How to use getItems method of execution Package

Best Gauge code snippet using execution.getItems

management_test.go

Source:management_test.go Github

copy

Full Screen

1package k8s2import (3 "fmt"4 "testing"5 core2 "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"6 "github.com/flyteorg/flyteplugins/go/tasks/logs"7 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"8 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"9 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"10 mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"11 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue"12 "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"13 arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core"14 "github.com/flyteorg/flytestdlib/bitarray"15 "github.com/flyteorg/flytestdlib/storage"16 stdmocks "github.com/flyteorg/flytestdlib/storage/mocks"17 "github.com/stretchr/testify/assert"18 "github.com/stretchr/testify/mock"19 "golang.org/x/net/context"20 structpb "google.golang.org/protobuf/types/known/structpb"21 v1 "k8s.io/api/core/v1"22 "k8s.io/apimachinery/pkg/api/resource"23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"24)25type metadata struct {26 exists bool27 size int6428}29func (m metadata) Exists() bool {30 return m.exists31}32func (m metadata) Size() int64 {33 return m.size34}35func createSampleContainerTask() *core2.Container {36 return &core2.Container{37 Command: []string{"cmd"},38 Args: []string{"{{$inputPrefix}}"},39 Image: "img1",40 }41}42func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.TaskExecutionContext {43 customStruct, _ := structpb.NewStruct(map[string]interface{}{44 "parallelism": fmt.Sprintf("%d", parallelism),45 })46 tr := &mocks.TaskReader{}47 tr.OnRead(ctx).Return(&core2.TaskTemplate{48 Custom: customStruct,49 Target: &core2.TaskTemplate_Container{50 Container: createSampleContainerTask(),51 },52 }, nil)53 tID := &mocks.TaskExecutionID{}54 tID.OnGetGeneratedName().Return("notfound")55 tID.OnGetID().Return(core2.TaskExecutionIdentifier{56 TaskId: &core2.Identifier{57 ResourceType: core2.ResourceType_TASK,58 Project: "a",59 Domain: "d",60 Name: "n",61 Version: "abc",62 },63 NodeExecutionId: &core2.NodeExecutionIdentifier{64 NodeId: "node1",65 ExecutionId: &core2.WorkflowExecutionIdentifier{66 Project: "a",67 Domain: "d",68 Name: "exec",69 },70 },71 RetryAttempt: 0,72 })73 overrides := &mocks.TaskOverrides{}74 overrides.OnGetResources().Return(&v1.ResourceRequirements{75 Requests: v1.ResourceList{76 v1.ResourceCPU: resource.MustParse("10"),77 },78 })79 tMeta := &mocks.TaskExecutionMetadata{}80 tMeta.OnGetTaskExecutionID().Return(tID)81 tMeta.OnGetOverrides().Return(overrides)82 tMeta.OnIsInterruptible().Return(false)83 tMeta.OnGetK8sServiceAccount().Return("s")84 tMeta.OnGetSecurityContext().Return(core2.SecurityContext{})85 tMeta.OnGetMaxAttempts().Return(2)86 tMeta.OnGetNamespace().Return("n")87 tMeta.OnGetLabels().Return(nil)88 tMeta.OnGetAnnotations().Return(nil)89 tMeta.OnGetOwnerReference().Return(metav1.OwnerReference{})90 tMeta.OnGetPlatformResources().Return(&v1.ResourceRequirements{})91 tMeta.OnGetInterruptibleFailureThreshold().Return(2)92 ow := &mocks2.OutputWriter{}93 ow.OnGetOutputPrefixPath().Return("/prefix/")94 ow.OnGetRawOutputPrefix().Return("/raw_prefix/")95 ow.OnGetCheckpointPrefix().Return("/checkpoint")96 ow.OnGetPreviousCheckpointsPrefix().Return("/prev")97 ir := &mocks2.InputReader{}98 ir.OnGetInputPrefixPath().Return("/prefix/")99 ir.OnGetInputPath().Return("/prefix/inputs.pb")100 ir.OnGetMatch(mock.Anything).Return(&core2.LiteralMap{}, nil)101 composedProtobufStore := &stdmocks.ComposedProtobufStore{}102 matchedBy := mock.MatchedBy(func(s storage.DataReference) bool {103 return true104 })105 composedProtobufStore.On("Head", mock.Anything, matchedBy).Return(metadata{true, 0}, nil)106 dataStore := &storage.DataStore{107 ComposedProtobufStore: composedProtobufStore,108 ReferenceConstructor: &storage.URLPathConstructor{},109 }110 tCtx := &mocks.TaskExecutionContext{}111 tCtx.OnTaskReader().Return(tr)112 tCtx.OnTaskExecutionMetadata().Return(tMeta)113 tCtx.OnOutputWriter().Return(ow)114 tCtx.OnInputReader().Return(ir)115 tCtx.OnDataStore().Return(dataStore)116 return tCtx117}118func TestCheckSubTasksState(t *testing.T) {119 ctx := context.Background()120 subtaskCount := 5121 config := Config{122 MaxArrayJobSize: int64(subtaskCount * 10),123 ResourceConfig: ResourceConfig{124 PrimaryLabel: "p",125 Limit: subtaskCount,126 },127 }128 fakeKubeClient := mocks.NewFakeKubeClient()129 fakeKubeCache := mocks.NewFakeKubeCache()130 for i := 0; i < subtaskCount; i++ {131 pod := flytek8s.BuildIdentityPod()132 pod.SetName(fmt.Sprintf("notfound-%d", i))133 pod.SetNamespace("a-n-b")134 pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: "foo"})135 pod.Status.Phase = v1.PodRunning136 _ = fakeKubeClient.Create(ctx, pod)137 _ = fakeKubeCache.Create(ctx, pod)138 }139 failureFakeKubeClient := mocks.NewFakeKubeClient()140 failureFakeKubeCache := mocks.NewFakeKubeCache()141 for i := 0; i < subtaskCount; i++ {142 pod := flytek8s.BuildIdentityPod()143 pod.SetName(fmt.Sprintf("notfound-%d", i))144 pod.SetNamespace("a-n-b")145 pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: "foo"})146 pod.Status.Phase = v1.PodFailed147 _ = failureFakeKubeClient.Create(ctx, pod)148 _ = failureFakeKubeCache.Create(ctx, pod)149 }150 t.Run("Launch", func(t *testing.T) {151 // initialize metadata152 kubeClient := mocks.KubeClient{}153 kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())154 kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())155 resourceManager := mocks.ResourceManager{}156 resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)157 tCtx := getMockTaskExecutionContext(ctx, 0)158 tCtx.OnResourceManager().Return(&resourceManager)159 currentState := &arrayCore.State{160 CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,161 ExecutionArraySize: subtaskCount,162 OriginalArraySize: int64(subtaskCount),163 OriginalMinSuccesses: int64(subtaskCount),164 ArrayStatus: arraystatus.ArrayStatus{165 Detailed: arrayCore.NewPhasesCompactArray(uint(subtaskCount)), // set all tasks to core.PhaseUndefined166 },167 IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached168 }169 // execute170 newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)171 // validate results172 assert.Nil(t, err)173 p, _ := newState.GetPhase()174 assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())175 resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)176 for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {177 assert.Equal(t, core.PhaseQueued, core.Phases[subtaskPhaseIndex])178 }179 })180 for i := 1; i <= subtaskCount; i++ {181 t.Run(fmt.Sprintf("LaunchParallelism%d", i), func(t *testing.T) {182 // initialize metadata183 kubeClient := mocks.KubeClient{}184 kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())185 kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())186 resourceManager := mocks.ResourceManager{}187 resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)188 tCtx := getMockTaskExecutionContext(ctx, i)189 tCtx.OnResourceManager().Return(&resourceManager)190 currentState := &arrayCore.State{191 CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,192 ExecutionArraySize: subtaskCount,193 OriginalArraySize: int64(subtaskCount),194 OriginalMinSuccesses: int64(subtaskCount),195 ArrayStatus: arraystatus.ArrayStatus{196 Detailed: arrayCore.NewPhasesCompactArray(uint(subtaskCount)), // set all tasks to core.PhaseUndefined197 },198 IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached199 }200 // execute201 newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)202 // validate results203 assert.Nil(t, err)204 p, _ := newState.GetPhase()205 assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())206 executed := 0207 for _, existingPhaseIdx := range newState.GetArrayStatus().Detailed.GetItems() {208 if core.Phases[existingPhaseIdx] != core.PhaseUndefined {209 executed++210 }211 }212 assert.Equal(t, i, executed)213 })214 }215 t.Run("LaunchResourcesExhausted", func(t *testing.T) {216 // initialize metadata217 kubeClient := mocks.KubeClient{}218 kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())219 kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())220 resourceManager := mocks.ResourceManager{}221 resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusExhausted, nil)222 tCtx := getMockTaskExecutionContext(ctx, 0)223 tCtx.OnResourceManager().Return(&resourceManager)224 currentState := &arrayCore.State{225 CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,226 ExecutionArraySize: subtaskCount,227 OriginalArraySize: int64(subtaskCount),228 OriginalMinSuccesses: int64(subtaskCount),229 ArrayStatus: arraystatus.ArrayStatus{230 Detailed: arrayCore.NewPhasesCompactArray(uint(subtaskCount)), // set all tasks to core.PhaseUndefined231 },232 IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached233 }234 // execute235 newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)236 // validate results237 assert.Nil(t, err)238 p, _ := newState.GetPhase()239 assert.Equal(t, arrayCore.PhaseWaitingForResources.String(), p.String())240 resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)241 for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {242 assert.Equal(t, core.PhaseWaitingForResources, core.Phases[subtaskPhaseIndex])243 }244 // execute again - with resources available and validate results245 nresourceManager := mocks.ResourceManager{}246 nresourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)247 ntCtx := getMockTaskExecutionContext(ctx, 0)248 ntCtx.OnResourceManager().Return(&nresourceManager)249 lastState, _, err := LaunchAndCheckSubTasksState(ctx, ntCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", newState)250 assert.Nil(t, err)251 np, _ := lastState.GetPhase()252 assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), np.String())253 resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)254 for _, subtaskPhaseIndex := range lastState.GetArrayStatus().Detailed.GetItems() {255 assert.Equal(t, core.PhaseQueued, core.Phases[subtaskPhaseIndex])256 }257 })258 t.Run("LaunchRetryableFailures", func(t *testing.T) {259 // initialize metadata260 kubeClient := mocks.KubeClient{}261 kubeClient.OnGetClient().Return(fakeKubeClient)262 kubeClient.OnGetCache().Return(fakeKubeCache)263 resourceManager := mocks.ResourceManager{}264 resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)265 tCtx := getMockTaskExecutionContext(ctx, 0)266 tCtx.OnResourceManager().Return(&resourceManager)267 detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))268 for i := 0; i < subtaskCount; i++ {269 detailed.SetItem(i, bitarray.Item(core.PhaseRetryableFailure)) // set all tasks to core.PhaseRetryableFailure270 }271 retryAttemptsArray, err := bitarray.NewCompactArray(uint(subtaskCount), bitarray.Item(1))272 assert.NoError(t, err)273 currentState := &arrayCore.State{274 CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,275 ExecutionArraySize: subtaskCount,276 OriginalArraySize: int64(subtaskCount),277 OriginalMinSuccesses: int64(subtaskCount),278 ArrayStatus: arraystatus.ArrayStatus{279 Detailed: detailed,280 },281 IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached282 RetryAttempts: retryAttemptsArray,283 }284 // execute285 newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)286 // validate results287 assert.Nil(t, err)288 p, _ := newState.GetPhase()289 assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())290 resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)291 for i, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {292 assert.Equal(t, core.PhaseQueued, core.Phases[subtaskPhaseIndex])293 assert.Equal(t, bitarray.Item(1), newState.RetryAttempts.GetItem(i))294 }295 })296 t.Run("RunningLogLinksAndSubtaskIDs", func(t *testing.T) {297 // initialize metadata298 config := Config{299 MaxArrayJobSize: 100,300 MaxErrorStringLength: 200,301 NamespaceTemplate: "a-{{.namespace}}-b",302 OutputAssembler: workqueue.Config{303 Workers: 2,304 MaxRetries: 0,305 IndexCacheMaxItems: 100,306 },307 ErrorAssembler: workqueue.Config{308 Workers: 2,309 MaxRetries: 0,310 IndexCacheMaxItems: 100,311 },312 LogConfig: LogConfig{313 Config: logs.LogConfig{314 IsCloudwatchEnabled: true,315 CloudwatchTemplateURI: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.{{ .podName }};streamFilter=typeLogStreamPrefix",316 IsKubernetesEnabled: true,317 KubernetesTemplateURI: "k8s/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}",318 }},319 }320 kubeClient := mocks.KubeClient{}321 kubeClient.OnGetClient().Return(fakeKubeClient)322 kubeClient.OnGetCache().Return(fakeKubeCache)323 resourceManager := mocks.ResourceManager{}324 resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusExhausted, nil)325 tCtx := getMockTaskExecutionContext(ctx, 0)326 tCtx.OnResourceManager().Return(&resourceManager)327 detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))328 for i := 0; i < subtaskCount; i++ {329 detailed.SetItem(i, bitarray.Item(core.PhaseRunning)) // set all tasks to core.PhaseRunning330 }331 currentState := &arrayCore.State{332 CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,333 ExecutionArraySize: subtaskCount,334 OriginalArraySize: int64(subtaskCount),335 OriginalMinSuccesses: int64(subtaskCount),336 ArrayStatus: arraystatus.ArrayStatus{337 Detailed: detailed,338 },339 IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached340 }341 // execute342 newState, externalResources, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)343 // validate results344 assert.Nil(t, err)345 p, _ := newState.GetPhase()346 assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())347 resourceManager.AssertNumberOfCalls(t, "AllocateResource", 0)348 resourceManager.AssertNumberOfCalls(t, "ReleaseResource", 0)349 assert.Equal(t, subtaskCount, len(externalResources))350 for i := 0; i < subtaskCount; i++ {351 externalResource := externalResources[i]352 assert.Equal(t, fmt.Sprintf("notfound-%d", i), externalResource.ExternalID)353 logLinks := externalResource.Logs354 assert.Equal(t, 2, len(logLinks))355 assert.Equal(t, fmt.Sprintf("Kubernetes Logs #0-%d", i), logLinks[0].Name)356 assert.Equal(t, fmt.Sprintf("k8s/log/a-n-b/notfound-%d/pod?namespace=a-n-b", i), logLinks[0].Uri)357 assert.Equal(t, fmt.Sprintf("Cloudwatch Logs #0-%d", i), logLinks[1].Name)358 assert.Equal(t, fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.notfound-%d;streamFilter=typeLogStreamPrefix", i), logLinks[1].Uri)359 }360 })361 t.Run("RunningRetryableFailures", func(t *testing.T) {362 // initialize metadata363 kubeClient := mocks.KubeClient{}364 kubeClient.OnGetClient().Return(failureFakeKubeClient)365 kubeClient.OnGetCache().Return(failureFakeKubeCache)366 resourceManager := mocks.ResourceManager{}367 resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)368 tCtx := getMockTaskExecutionContext(ctx, 0)369 tCtx.OnResourceManager().Return(&resourceManager)370 detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))371 for i := 0; i < subtaskCount; i++ {372 detailed.SetItem(i, bitarray.Item(core.PhaseRunning)) // set all tasks to core.PhaseRunning373 }374 retryAttemptsArray, err := bitarray.NewCompactArray(uint(subtaskCount), bitarray.Item(1))375 assert.NoError(t, err)376 currentState := &arrayCore.State{377 CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,378 ExecutionArraySize: subtaskCount,379 OriginalArraySize: int64(subtaskCount),380 OriginalMinSuccesses: int64(subtaskCount),381 ArrayStatus: arraystatus.ArrayStatus{382 Detailed: detailed,383 },384 IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached385 RetryAttempts: retryAttemptsArray,386 }387 // execute388 newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, tCtx.DataStore(), "/prefix/", "/prefix-sand/", currentState)389 // validate results390 assert.Nil(t, err)391 p, _ := newState.GetPhase()392 assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())393 resourceManager.AssertNumberOfCalls(t, "ReleaseResource", subtaskCount)394 for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {395 assert.Equal(t, core.PhaseRetryableFailure, core.Phases[subtaskPhaseIndex])396 }397 })398 t.Run("RunningPermanentFailures", func(t *testing.T) {399 // initialize metadata400 kubeClient := mocks.KubeClient{}401 kubeClient.OnGetClient().Return(failureFakeKubeClient)402 kubeClient.OnGetCache().Return(failureFakeKubeCache)403 resourceManager := mocks.ResourceManager{}404 resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)405 tCtx := getMockTaskExecutionContext(ctx, 0)406 tCtx.OnResourceManager().Return(&resourceManager)407 detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))408 for i := 0; i < subtaskCount; i++ {409 detailed.SetItem(i, bitarray.Item(core.PhaseRunning)) // set all tasks to core.PhaseRunning410 }411 retryAttemptsArray, err := bitarray.NewCompactArray(uint(subtaskCount), bitarray.Item(1))412 assert.NoError(t, err)413 for i := 0; i < subtaskCount; i++ {414 retryAttemptsArray.SetItem(i, bitarray.Item(1))415 }416 currentState := &arrayCore.State{417 CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,418 ExecutionArraySize: subtaskCount,419 OriginalArraySize: int64(subtaskCount),420 OriginalMinSuccesses: int64(subtaskCount),421 ArrayStatus: arraystatus.ArrayStatus{422 Detailed: detailed,423 },424 IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached425 RetryAttempts: retryAttemptsArray,426 }427 // execute428 newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, tCtx.DataStore(), "/prefix/", "/prefix-sand/", currentState)429 // validate results430 assert.Nil(t, err)431 p, _ := newState.GetPhase()432 assert.Equal(t, arrayCore.PhaseWriteToDiscoveryThenFail.String(), p.String())433 resourceManager.AssertNumberOfCalls(t, "ReleaseResource", subtaskCount)434 for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {435 assert.Equal(t, core.PhasePermanentFailure, core.Phases[subtaskPhaseIndex])436 }437 })438}...

Full Screen

Full Screen

management.go

Source:management.go Github

copy

Full Screen

1package k8s2import (3 "context"4 "fmt"5 "time"6 idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"7 "github.com/flyteorg/flyteplugins/go/tasks/errors"8 "github.com/flyteorg/flyteplugins/go/tasks/logs"9 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"10 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"11 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"12 "github.com/flyteorg/flyteplugins/go/tasks/plugins/array"13 "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"14 arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core"15 "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/errorcollector"16 "github.com/flyteorg/flytestdlib/bitarray"17 "github.com/flyteorg/flytestdlib/logger"18 "github.com/flyteorg/flytestdlib/storage"19)20// allocateResource attempts to allot resources for the specified parameter with the21// TaskExecutionContexts ResourceManager.22func allocateResource(ctx context.Context, tCtx core.TaskExecutionContext, config *Config, podName string) (core.AllocationStatus, error) {23 if !IsResourceConfigSet(config.ResourceConfig) {24 return core.AllocationStatusGranted, nil25 }26 resourceNamespace := core.ResourceNamespace(config.ResourceConfig.PrimaryLabel)27 resourceConstraintSpec := core.ResourceConstraintsSpec{28 ProjectScopeResourceConstraint: nil,29 NamespaceScopeResourceConstraint: nil,30 }31 allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespace, podName, resourceConstraintSpec)32 if err != nil {33 return core.AllocationUndefined, err34 }35 return allocationStatus, nil36}37// deallocateResource attempts to release resources for the specified parameter with the38// TaskExecutionContexts ResourceManager.39func deallocateResource(ctx context.Context, tCtx core.TaskExecutionContext, config *Config, podName string) error {40 if !IsResourceConfigSet(config.ResourceConfig) {41 return nil42 }43 resourceNamespace := core.ResourceNamespace(config.ResourceConfig.PrimaryLabel)44 err := tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespace, podName)45 if err != nil {46 logger.Errorf(ctx, "Error releasing token [%s]. error %s", podName, err)47 return err48 }49 return nil50}51// LaunchAndCheckSubTasksState iterates over each subtask performing operations to transition them52// to a terminal state. This may include creating new k8s resources, monitoring existing k8s53// resources, retrying failed attempts, or declaring a permanent failure among others.54func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient,55 config *Config, dataStore *storage.DataStore, outputPrefix, baseOutputDataSandbox storage.DataReference, currentState *arrayCore.State) (56 newState *arrayCore.State, externalResources []*core.ExternalResource, err error) {57 if int64(currentState.GetExecutionArraySize()) > config.MaxArrayJobSize {58 ee := fmt.Errorf("array size > max allowed. Requested [%v]. Allowed [%v]", currentState.GetExecutionArraySize(), config.MaxArrayJobSize)59 logger.Info(ctx, ee)60 currentState = currentState.SetPhase(arrayCore.PhasePermanentFailure, 0).SetReason(ee.Error())61 return currentState, externalResources, nil62 }63 newState = currentState64 messageCollector := errorcollector.NewErrorMessageCollector()65 newArrayStatus := &arraystatus.ArrayStatus{66 Summary: arraystatus.ArraySummary{},67 Detailed: arrayCore.NewPhasesCompactArray(uint(currentState.GetExecutionArraySize())),68 }69 externalResources = make([]*core.ExternalResource, 0, len(currentState.GetArrayStatus().Detailed.GetItems()))70 // If we have arrived at this state for the first time then currentState has not been71 // initialized with number of sub tasks.72 if len(currentState.GetArrayStatus().Detailed.GetItems()) == 0 {73 currentState.ArrayStatus = *newArrayStatus74 }75 // If the current State is newly minted then we must initialize RetryAttempts to track how many76 // times each subtask is executed.77 if len(currentState.RetryAttempts.GetItems()) == 0 {78 count := uint(currentState.GetExecutionArraySize())79 maxValue := bitarray.Item(tCtx.TaskExecutionMetadata().GetMaxAttempts())80 retryAttemptsArray, err := bitarray.NewCompactArray(count, maxValue)81 if err != nil {82 logger.Errorf(ctx, "Failed to create attempts compact array with [count: %v, maxValue: %v]", count, maxValue)83 return currentState, externalResources, nil84 }85 // Initialize subtask retryAttempts to 0 so that, in tandem with the podName logic, we86 // maintain backwards compatibility.87 for i := 0; i < currentState.GetExecutionArraySize(); i++ {88 retryAttemptsArray.SetItem(i, 0)89 }90 currentState.RetryAttempts = retryAttemptsArray91 }92 // If the current State is newly minted then we must initialize SystemFailures to track how many93 // times the subtask failed due to system issues, this is necessary to correctly evaluate94 // interruptible subtasks.95 if len(currentState.SystemFailures.GetItems()) == 0 {96 count := uint(currentState.GetExecutionArraySize())97 maxValue := bitarray.Item(tCtx.TaskExecutionMetadata().GetInterruptibleFailureThreshold())98 systemFailuresArray, err := bitarray.NewCompactArray(count, maxValue)99 if err != nil {100 logger.Errorf(ctx, "Failed to create system failures array with [count: %v, maxValue: %v]", count, maxValue)101 return currentState, externalResources, err102 }103 for i := 0; i < currentState.GetExecutionArraySize(); i++ {104 systemFailuresArray.SetItem(i, 0)105 }106 currentState.SystemFailures = systemFailuresArray107 }108 // initialize log plugin109 logPlugin, err := logs.InitializeLogPlugins(&config.LogConfig.Config)110 if err != nil {111 return currentState, externalResources, err112 }113 // identify max parallelism114 taskTemplate, err := tCtx.TaskReader().Read(ctx)115 if err != nil {116 return currentState, externalResources, err117 } else if taskTemplate == nil {118 return currentState, externalResources, errors.Errorf(errors.BadTaskSpecification, "Required value not set, taskTemplate is nil")119 }120 arrayJob, err := arrayCore.ToArrayJob(taskTemplate.GetCustom(), taskTemplate.TaskTypeVersion)121 if err != nil {122 return currentState, externalResources, err123 }124 currentParallelism := 0125 maxParallelism := int(arrayJob.Parallelism)126 currentSubTaskPhaseHash, err := currentState.GetArrayStatus().HashCode()127 if err != nil {128 return currentState, externalResources, err129 }130 for childIdx, existingPhaseIdx := range currentState.GetArrayStatus().Detailed.GetItems() {131 existingPhase := core.Phases[existingPhaseIdx]132 retryAttempt := currentState.RetryAttempts.GetItem(childIdx)133 if existingPhase == core.PhaseRetryableFailure {134 retryAttempt++135 newState.RetryAttempts.SetItem(childIdx, retryAttempt)136 } else if existingPhase.IsTerminal() {137 newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(existingPhase))138 continue139 }140 originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache())141 systemFailures := currentState.SystemFailures.GetItem(childIdx)142 stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, childIdx, originalIdx, retryAttempt, systemFailures)143 if err != nil {144 return currentState, externalResources, err145 }146 podName := stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()147 // depending on the existing subtask phase we either a launch new k8s resource or monitor148 // an existing instance149 var phaseInfo core.PhaseInfo150 var perr error151 if existingPhase == core.PhaseUndefined || existingPhase == core.PhaseWaitingForResources || existingPhase == core.PhaseRetryableFailure {152 // attempt to allocateResource153 allocationStatus, err := allocateResource(ctx, stCtx, config, podName)154 if err != nil {155 logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s",156 stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), podName, err)157 return currentState, externalResources, err158 }159 logger.Infof(ctx, "Allocation result for [%s] is [%s]", podName, allocationStatus)160 if allocationStatus != core.AllocationStatusGranted {161 phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "Exceeded ResourceManager quota", nil)162 } else {163 phaseInfo, perr = launchSubtask(ctx, stCtx, config, kubeClient)164 // if launchSubtask fails we attempt to deallocate the (previously allocated)165 // resource to mitigate leaks166 if perr != nil {167 perr = deallocateResource(ctx, stCtx, config, podName)168 if perr != nil {169 logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", podName, err)170 }171 }172 }173 } else {174 phaseInfo, perr = getSubtaskPhaseInfo(ctx, stCtx, config, kubeClient, logPlugin)175 }176 if perr != nil {177 return currentState, externalResources, perr178 }179 if phaseInfo.Err() != nil {180 messageCollector.Collect(childIdx, phaseInfo.Err().String())181 // If the service reported an error but there is no error.pb written, write one with the182 // service-provided error message.183 or, err := array.ConstructOutputReader(ctx, dataStore, outputPrefix, baseOutputDataSandbox, originalIdx)184 if err != nil {185 return currentState, externalResources, err186 }187 if hasErr, err := or.IsError(ctx); err != nil {188 return currentState, externalResources, err189 } else if !hasErr {190 // The subtask has not produced an error.pb, write one.191 ow, err := array.ConstructOutputWriter(ctx, dataStore, outputPrefix, baseOutputDataSandbox, originalIdx)192 if err != nil {193 return currentState, externalResources, err194 }195 if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{196 ExecutionError: phaseInfo.Err(),197 IsRecoverable: phaseInfo.Phase() != core.PhasePermanentFailure,198 })); err != nil {199 return currentState, externalResources, err200 }201 }202 }203 if phaseInfo.Err() != nil && phaseInfo.Err().GetKind() == idlCore.ExecutionError_SYSTEM {204 newState.SystemFailures.SetItem(childIdx, systemFailures+1)205 } else {206 newState.SystemFailures.SetItem(childIdx, systemFailures)207 }208 // process subtask phase209 actualPhase := phaseInfo.Phase()210 if actualPhase.IsSuccess() {211 actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, baseOutputDataSandbox, childIdx, originalIdx)212 if err != nil {213 return currentState, externalResources, err214 }215 }216 if actualPhase == core.PhaseRetryableFailure && uint32(retryAttempt+1) >= stCtx.TaskExecutionMetadata().GetMaxAttempts() {217 // If we see a retryable failure we must check if the number of retries exceeds the maximum218 // attempts. If so, transition to a permanent failure so that is not attempted again.219 actualPhase = core.PhasePermanentFailure220 }221 newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(actualPhase))222 if actualPhase.IsTerminal() {223 err = deallocateResource(ctx, stCtx, config, podName)224 if err != nil {225 logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", podName, err)226 return currentState, externalResources, err227 }228 err = finalizeSubtask(ctx, stCtx, config, kubeClient)229 if err != nil {230 logger.Errorf(ctx, "Error finalizing resource [%s] in Finalize [%s]", podName, err)231 return currentState, externalResources, err232 }233 }234 // process phaseInfo235 var logLinks []*idlCore.TaskLog236 if phaseInfo.Info() != nil {237 logLinks = phaseInfo.Info().Logs238 }239 externalResources = append(externalResources, &core.ExternalResource{240 ExternalID: podName,241 Index: uint32(originalIdx),242 Logs: logLinks,243 RetryAttempt: uint32(retryAttempt),244 Phase: actualPhase,245 })246 // validate parallelism247 if !actualPhase.IsTerminal() || actualPhase == core.PhaseRetryableFailure {248 currentParallelism++249 }250 if maxParallelism != 0 && currentParallelism >= maxParallelism {251 break252 }253 }254 // compute task phase from array status summary255 for _, phaseIdx := range newArrayStatus.Detailed.GetItems() {256 newArrayStatus.Summary.Inc(core.Phases[phaseIdx])257 }258 phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary)259 // process new state260 newState = newState.SetArrayStatus(*newArrayStatus)261 if phase == arrayCore.PhaseWriteToDiscoveryThenFail {262 errorMsg := messageCollector.Summary(GetConfig().MaxErrorStringLength)263 newState = newState.SetReason(errorMsg)264 }265 _, version := currentState.GetPhase()266 if phase == arrayCore.PhaseCheckingSubTaskExecutions {267 newSubTaskPhaseHash, err := newState.GetArrayStatus().HashCode()268 if err != nil {269 return currentState, externalResources, err270 }271 if newSubTaskPhaseHash != currentSubTaskPhaseHash {272 version++273 }274 newState = newState.SetPhase(phase, version).SetReason("Task is still running")275 } else {276 newState = newState.SetPhase(phase, version)277 }278 return newState, externalResources, nil279}280// TerminateSubTasks performs operations to gracefully terminate all subtasks. This may include281// aborting and finalizing active k8s resources.282func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, config *Config,283 terminateFunction func(context.Context, SubTaskExecutionContext, *Config, core.KubeClient) error, currentState *arrayCore.State) error {284 taskTemplate, err := tCtx.TaskReader().Read(ctx)285 if err != nil {286 return err287 } else if taskTemplate == nil {288 return errors.Errorf(errors.BadTaskSpecification, "Required value not set, taskTemplate is nil")289 }290 messageCollector := errorcollector.NewErrorMessageCollector()291 for childIdx, existingPhaseIdx := range currentState.GetArrayStatus().Detailed.GetItems() {292 existingPhase := core.Phases[existingPhaseIdx]293 retryAttempt := uint64(0)294 if childIdx < len(currentState.RetryAttempts.GetItems()) {295 // we can use RetryAttempts if it has been initialized, otherwise stay with default 0296 retryAttempt = currentState.RetryAttempts.GetItem(childIdx)297 }298 // return immediately if subtask has completed or not yet started299 if existingPhase.IsTerminal() || existingPhase == core.PhaseUndefined {300 continue301 }302 originalIdx := arrayCore.CalculateOriginalIndex(childIdx, currentState.GetIndexesToCache())303 stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, childIdx, originalIdx, retryAttempt, 0)304 if err != nil {305 return err306 }307 err = terminateFunction(ctx, stCtx, config, kubeClient)308 if err != nil {309 messageCollector.Collect(childIdx, err.Error())310 }311 }312 if messageCollector.Length() > 0 {313 return fmt.Errorf(messageCollector.Summary(config.MaxErrorStringLength))314 }315 return nil316}...

Full Screen

Full Screen

zz_generated.managedlist.go

Source:zz_generated.managedlist.go Github

copy

Full Screen

1/*2Copyright 2021 The Crossplane Authors.3Licensed under the Apache License, Version 2.0 (the "License");4you may not use this file except in compliance with the License.5You may obtain a copy of the License at6 http://www.apache.org/licenses/LICENSE-2.07Unless required by applicable law or agreed to in writing, software8distributed under the License is distributed on an "AS IS" BASIS,9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10See the License for the specific language governing permissions and11limitations under the License.12*/13// Code generated by angryjet. DO NOT EDIT.14package v1alpha115import resource "github.com/crossplane/crossplane-runtime/pkg/resource"16// GetItems of this BindingsList.17func (l *BindingsList) GetItems() []resource.Managed {18 items := make([]resource.Managed, len(l.Items))19 for i := range l.Items {20 items[i] = &l.Items[i]21 }22 return items23}24// GetItems of this ExecutionConfigList.25func (l *ExecutionConfigList) GetItems() []resource.Managed {26 items := make([]resource.Managed, len(l.Items))27 for i := range l.Items {28 items[i] = &l.Items[i]29 }30 return items31}32// GetItems of this ExecutionList.33func (l *ExecutionList) GetItems() []resource.Managed {34 items := make([]resource.Managed, len(l.Items))35 for i := range l.Items {36 items[i] = &l.Items[i]37 }38 return items39}40// GetItems of this FlowList.41func (l *FlowList) GetItems() []resource.Managed {42 items := make([]resource.Managed, len(l.Items))43 for i := range l.Items {44 items[i] = &l.Items[i]45 }46 return items47}48// GetItems of this SubflowList.49func (l *SubflowList) GetItems() []resource.Managed {50 items := make([]resource.Managed, len(l.Items))51 for i := range l.Items {52 items[i] = &l.Items[i]53 }54 return items55}...

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 wg.Add(1)4 go func() {5 defer wg.Done()6 fmt.Println("Hello")7 }()8 wg.Wait()9}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2type Execution struct {3}4type Item struct {5}6func getItems() []Item {7 item1 := Item{ItemId: 1, ItemName: "Item1", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId: 1}8 item2 := Item{ItemId: 2, ItemName: "Item2", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId: 1}9 item3 := Item{ItemId: 3, ItemName: "Item3", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId: 1}10 item4 := Item{ItemId: 4, ItemName: "Item4", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId:

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2type execution struct {3}4func (e execution) getItems() []string {5 return []string{"item1", "item2", "item3"}6}7func main() {8 e := execution{}9 method := reflect.ValueOf(e).MethodByName("getItems")10 name := runtime.FuncForPC(method.Pointer()).Name()11 className := strings.Split(name, ".")[0]12 t := reflect.TypeOf(e)13 methodType := t.MethodByName("getItems").Type14 argumentCount := methodType.NumIn()15 returnCount := methodType.NumOut()16 fmt.Println("Class Name:", className)17 fmt.Println("Method Name:", "getItems")18 fmt.Println("Argument Count:", argumentCount)19 fmt.Println("Return Count:", returnCount)20}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3 fmt.Println("Hello, playground")4 exec.getItems()5}6import "fmt"7type execution struct {8}9func (e execution) getItems() {10 fmt.Println("getItems")11 fmt.Println(e.items)12}13import "fmt"14func main() {15 fmt.Println("Hello, playground")16 exec.getItems()17}18import "fmt"19type execution struct {20}21func (e execution) getItems() {22 fmt.Println("getItems")23 fmt.Println(e.items)24}25import "fmt"26func main() {27 fmt.Println("Hello, playground")28 exec.getItems()29}30import "

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3 fmt.Println("Hello, playground")4 execution := new(Execution)5 execution.getItems()6}7import "fmt"8type Execution struct {9}10func (e *Execution) getItems() {11 e.items = []string{"item1", "item2", "item3"}12 fmt.Println(e.items)13}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1func main() {2 var executionObj = execution.GetItems{}3 var items = executionObj.GetItems()4 fmt.Println(items)5}6type GetItems struct {}7func (getItems GetItems) GetItems() []string {8 items := []string{"item1", "item2"}9}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2var tmpl = template.Must(template.ParseGlob("form/*"))3func Index(w http.ResponseWriter, r *http.Request) {4 tmpl.ExecuteTemplate(w, "Index", nil)5}6func getItems(w http.ResponseWriter, r *http.Request) {7 items := execution.GetItems()8 tmpl.ExecuteTemplate(w, "getItems", items)9}10func main() {11 router := mux.NewRouter()12 router.HandleFunc("/", Index)13 router.HandleFunc("/getItems", getItems).Methods("GET")14 http.ListenAndServe(":8000", router)15}16{{define "getItems"}}17 {{range .}}18 <td>{{.ID}}</td>19 <td>{{.Name}}</td>20 <td>{{.Price}}</td>21 {{end}}22{{end}}23{{define "Index"}}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Gauge automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used method in

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful