How to use getItems method of execution Package

Best Gauge code snippet using execution.getItems

management_test.go

Source:management_test.go Github

copy

Full Screen

1package k8s2import (3	"fmt"4	"testing"5	core2 "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"6	"github.com/flyteorg/flyteplugins/go/tasks/logs"7	"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"8	"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"9	"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"10	mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"11	"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue"12	"github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"13	arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core"14	"github.com/flyteorg/flytestdlib/bitarray"15	"github.com/flyteorg/flytestdlib/storage"16	stdmocks "github.com/flyteorg/flytestdlib/storage/mocks"17	"github.com/stretchr/testify/assert"18	"github.com/stretchr/testify/mock"19	"golang.org/x/net/context"20	structpb "google.golang.org/protobuf/types/known/structpb"21	v1 "k8s.io/api/core/v1"22	"k8s.io/apimachinery/pkg/api/resource"23	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"24)25type metadata struct {26	exists bool27	size   int6428}29func (m metadata) Exists() bool {30	return m.exists31}32func (m metadata) Size() int64 {33	return m.size34}35func createSampleContainerTask() *core2.Container {36	return &core2.Container{37		Command: []string{"cmd"},38		Args:    []string{"{{$inputPrefix}}"},39		Image:   "img1",40	}41}42func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.TaskExecutionContext {43	customStruct, _ := structpb.NewStruct(map[string]interface{}{44		"parallelism": fmt.Sprintf("%d", parallelism),45	})46	tr := &mocks.TaskReader{}47	tr.OnRead(ctx).Return(&core2.TaskTemplate{48		Custom: customStruct,49		Target: &core2.TaskTemplate_Container{50			Container: createSampleContainerTask(),51		},52	}, nil)53	tID := &mocks.TaskExecutionID{}54	tID.OnGetGeneratedName().Return("notfound")55	tID.OnGetID().Return(core2.TaskExecutionIdentifier{56		TaskId: &core2.Identifier{57			ResourceType: core2.ResourceType_TASK,58			Project:      "a",59			Domain:       "d",60			Name:         "n",61			Version:      "abc",62		},63		NodeExecutionId: &core2.NodeExecutionIdentifier{64			NodeId: "node1",65			ExecutionId: &core2.WorkflowExecutionIdentifier{66				Project: "a",67				Domain:  "d",68				Name:    "exec",69			},70		},71		RetryAttempt: 0,72	})73	overrides := &mocks.TaskOverrides{}74	overrides.OnGetResources().Return(&v1.ResourceRequirements{75		Requests: v1.ResourceList{76			v1.ResourceCPU: resource.MustParse("10"),77		},78	})79	tMeta := &mocks.TaskExecutionMetadata{}80	tMeta.OnGetTaskExecutionID().Return(tID)81	tMeta.OnGetOverrides().Return(overrides)82	tMeta.OnIsInterruptible().Return(false)83	tMeta.OnGetK8sServiceAccount().Return("s")84	tMeta.OnGetSecurityContext().Return(core2.SecurityContext{})85	tMeta.OnGetMaxAttempts().Return(2)86	tMeta.OnGetNamespace().Return("n")87	tMeta.OnGetLabels().Return(nil)88	tMeta.OnGetAnnotations().Return(nil)89	tMeta.OnGetOwnerReference().Return(metav1.OwnerReference{})90	tMeta.OnGetPlatformResources().Return(&v1.ResourceRequirements{})91	tMeta.OnGetInterruptibleFailureThreshold().Return(2)92	ow := &mocks2.OutputWriter{}93	ow.OnGetOutputPrefixPath().Return("/prefix/")94	ow.OnGetRawOutputPrefix().Return("/raw_prefix/")95	ow.OnGetCheckpointPrefix().Return("/checkpoint")96	ow.OnGetPreviousCheckpointsPrefix().Return("/prev")97	ir := &mocks2.InputReader{}98	ir.OnGetInputPrefixPath().Return("/prefix/")99	ir.OnGetInputPath().Return("/prefix/inputs.pb")100	ir.OnGetMatch(mock.Anything).Return(&core2.LiteralMap{}, nil)101	composedProtobufStore := &stdmocks.ComposedProtobufStore{}102	matchedBy := mock.MatchedBy(func(s storage.DataReference) bool {103		return true104	})105	composedProtobufStore.On("Head", mock.Anything, matchedBy).Return(metadata{true, 0}, nil)106	dataStore := &storage.DataStore{107		ComposedProtobufStore: composedProtobufStore,108		ReferenceConstructor:  &storage.URLPathConstructor{},109	}110	tCtx := &mocks.TaskExecutionContext{}111	tCtx.OnTaskReader().Return(tr)112	tCtx.OnTaskExecutionMetadata().Return(tMeta)113	tCtx.OnOutputWriter().Return(ow)114	tCtx.OnInputReader().Return(ir)115	tCtx.OnDataStore().Return(dataStore)116	return tCtx117}118func TestCheckSubTasksState(t *testing.T) {119	ctx := context.Background()120	subtaskCount := 5121	config := Config{122		MaxArrayJobSize: int64(subtaskCount * 10),123		ResourceConfig: ResourceConfig{124			PrimaryLabel: "p",125			Limit:        subtaskCount,126		},127	}128	fakeKubeClient := mocks.NewFakeKubeClient()129	fakeKubeCache := mocks.NewFakeKubeCache()130	for i := 0; i < subtaskCount; i++ {131		pod := flytek8s.BuildIdentityPod()132		pod.SetName(fmt.Sprintf("notfound-%d", i))133		pod.SetNamespace("a-n-b")134		pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: "foo"})135		pod.Status.Phase = v1.PodRunning136		_ = fakeKubeClient.Create(ctx, pod)137		_ = fakeKubeCache.Create(ctx, pod)138	}139	failureFakeKubeClient := mocks.NewFakeKubeClient()140	failureFakeKubeCache := mocks.NewFakeKubeCache()141	for i := 0; i < subtaskCount; i++ {142		pod := flytek8s.BuildIdentityPod()143		pod.SetName(fmt.Sprintf("notfound-%d", i))144		pod.SetNamespace("a-n-b")145		pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: "foo"})146		pod.Status.Phase = v1.PodFailed147		_ = failureFakeKubeClient.Create(ctx, pod)148		_ = failureFakeKubeCache.Create(ctx, pod)149	}150	t.Run("Launch", func(t *testing.T) {151		// initialize metadata152		kubeClient := mocks.KubeClient{}153		kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())154		kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())155		resourceManager := mocks.ResourceManager{}156		resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)157		tCtx := getMockTaskExecutionContext(ctx, 0)158		tCtx.OnResourceManager().Return(&resourceManager)159		currentState := &arrayCore.State{160			CurrentPhase:         arrayCore.PhaseCheckingSubTaskExecutions,161			ExecutionArraySize:   subtaskCount,162			OriginalArraySize:    int64(subtaskCount),163			OriginalMinSuccesses: int64(subtaskCount),164			ArrayStatus: arraystatus.ArrayStatus{165				Detailed: arrayCore.NewPhasesCompactArray(uint(subtaskCount)), // set all tasks to core.PhaseUndefined166			},167			IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached168		}169		// execute170		newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)171		// validate results172		assert.Nil(t, err)173		p, _ := newState.GetPhase()174		assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())175		resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)176		for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {177			assert.Equal(t, core.PhaseQueued, core.Phases[subtaskPhaseIndex])178		}179	})180	for i := 1; i <= subtaskCount; i++ {181		t.Run(fmt.Sprintf("LaunchParallelism%d", i), func(t *testing.T) {182			// initialize metadata183			kubeClient := mocks.KubeClient{}184			kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())185			kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())186			resourceManager := mocks.ResourceManager{}187			resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)188			tCtx := getMockTaskExecutionContext(ctx, i)189			tCtx.OnResourceManager().Return(&resourceManager)190			currentState := &arrayCore.State{191				CurrentPhase:         arrayCore.PhaseCheckingSubTaskExecutions,192				ExecutionArraySize:   subtaskCount,193				OriginalArraySize:    int64(subtaskCount),194				OriginalMinSuccesses: int64(subtaskCount),195				ArrayStatus: arraystatus.ArrayStatus{196					Detailed: arrayCore.NewPhasesCompactArray(uint(subtaskCount)), // set all tasks to core.PhaseUndefined197				},198				IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached199			}200			// execute201			newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)202			// validate results203			assert.Nil(t, err)204			p, _ := newState.GetPhase()205			assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())206			executed := 0207			for _, existingPhaseIdx := range newState.GetArrayStatus().Detailed.GetItems() {208				if core.Phases[existingPhaseIdx] != core.PhaseUndefined {209					executed++210				}211			}212			assert.Equal(t, i, executed)213		})214	}215	t.Run("LaunchResourcesExhausted", func(t *testing.T) {216		// initialize metadata217		kubeClient := mocks.KubeClient{}218		kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())219		kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())220		resourceManager := mocks.ResourceManager{}221		resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusExhausted, nil)222		tCtx := getMockTaskExecutionContext(ctx, 0)223		tCtx.OnResourceManager().Return(&resourceManager)224		currentState := &arrayCore.State{225			CurrentPhase:         arrayCore.PhaseCheckingSubTaskExecutions,226			ExecutionArraySize:   subtaskCount,227			OriginalArraySize:    int64(subtaskCount),228			OriginalMinSuccesses: int64(subtaskCount),229			ArrayStatus: arraystatus.ArrayStatus{230				Detailed: arrayCore.NewPhasesCompactArray(uint(subtaskCount)), // set all tasks to core.PhaseUndefined231			},232			IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached233		}234		// execute235		newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)236		// validate results237		assert.Nil(t, err)238		p, _ := newState.GetPhase()239		assert.Equal(t, arrayCore.PhaseWaitingForResources.String(), p.String())240		resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)241		for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {242			assert.Equal(t, core.PhaseWaitingForResources, core.Phases[subtaskPhaseIndex])243		}244		// execute again - with resources available and validate results245		nresourceManager := mocks.ResourceManager{}246		nresourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)247		ntCtx := getMockTaskExecutionContext(ctx, 0)248		ntCtx.OnResourceManager().Return(&nresourceManager)249		lastState, _, err := LaunchAndCheckSubTasksState(ctx, ntCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", newState)250		assert.Nil(t, err)251		np, _ := lastState.GetPhase()252		assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), np.String())253		resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)254		for _, subtaskPhaseIndex := range lastState.GetArrayStatus().Detailed.GetItems() {255			assert.Equal(t, core.PhaseQueued, core.Phases[subtaskPhaseIndex])256		}257	})258	t.Run("LaunchRetryableFailures", func(t *testing.T) {259		// initialize metadata260		kubeClient := mocks.KubeClient{}261		kubeClient.OnGetClient().Return(fakeKubeClient)262		kubeClient.OnGetCache().Return(fakeKubeCache)263		resourceManager := mocks.ResourceManager{}264		resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)265		tCtx := getMockTaskExecutionContext(ctx, 0)266		tCtx.OnResourceManager().Return(&resourceManager)267		detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))268		for i := 0; i < subtaskCount; i++ {269			detailed.SetItem(i, bitarray.Item(core.PhaseRetryableFailure)) // set all tasks to core.PhaseRetryableFailure270		}271		retryAttemptsArray, err := bitarray.NewCompactArray(uint(subtaskCount), bitarray.Item(1))272		assert.NoError(t, err)273		currentState := &arrayCore.State{274			CurrentPhase:         arrayCore.PhaseCheckingSubTaskExecutions,275			ExecutionArraySize:   subtaskCount,276			OriginalArraySize:    int64(subtaskCount),277			OriginalMinSuccesses: int64(subtaskCount),278			ArrayStatus: arraystatus.ArrayStatus{279				Detailed: detailed,280			},281			IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached282			RetryAttempts:  retryAttemptsArray,283		}284		// execute285		newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)286		// validate results287		assert.Nil(t, err)288		p, _ := newState.GetPhase()289		assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())290		resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)291		for i, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {292			assert.Equal(t, core.PhaseQueued, core.Phases[subtaskPhaseIndex])293			assert.Equal(t, bitarray.Item(1), newState.RetryAttempts.GetItem(i))294		}295	})296	t.Run("RunningLogLinksAndSubtaskIDs", func(t *testing.T) {297		// initialize metadata298		config := Config{299			MaxArrayJobSize:      100,300			MaxErrorStringLength: 200,301			NamespaceTemplate:    "a-{{.namespace}}-b",302			OutputAssembler: workqueue.Config{303				Workers:            2,304				MaxRetries:         0,305				IndexCacheMaxItems: 100,306			},307			ErrorAssembler: workqueue.Config{308				Workers:            2,309				MaxRetries:         0,310				IndexCacheMaxItems: 100,311			},312			LogConfig: LogConfig{313				Config: logs.LogConfig{314					IsCloudwatchEnabled:   true,315					CloudwatchTemplateURI: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.{{ .podName }};streamFilter=typeLogStreamPrefix",316					IsKubernetesEnabled:   true,317					KubernetesTemplateURI: "k8s/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}",318				}},319		}320		kubeClient := mocks.KubeClient{}321		kubeClient.OnGetClient().Return(fakeKubeClient)322		kubeClient.OnGetCache().Return(fakeKubeCache)323		resourceManager := mocks.ResourceManager{}324		resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusExhausted, nil)325		tCtx := getMockTaskExecutionContext(ctx, 0)326		tCtx.OnResourceManager().Return(&resourceManager)327		detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))328		for i := 0; i < subtaskCount; i++ {329			detailed.SetItem(i, bitarray.Item(core.PhaseRunning)) // set all tasks to core.PhaseRunning330		}331		currentState := &arrayCore.State{332			CurrentPhase:         arrayCore.PhaseCheckingSubTaskExecutions,333			ExecutionArraySize:   subtaskCount,334			OriginalArraySize:    int64(subtaskCount),335			OriginalMinSuccesses: int64(subtaskCount),336			ArrayStatus: arraystatus.ArrayStatus{337				Detailed: detailed,338			},339			IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached340		}341		// execute342		newState, externalResources, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)343		// validate results344		assert.Nil(t, err)345		p, _ := newState.GetPhase()346		assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())347		resourceManager.AssertNumberOfCalls(t, "AllocateResource", 0)348		resourceManager.AssertNumberOfCalls(t, "ReleaseResource", 0)349		assert.Equal(t, subtaskCount, len(externalResources))350		for i := 0; i < subtaskCount; i++ {351			externalResource := externalResources[i]352			assert.Equal(t, fmt.Sprintf("notfound-%d", i), externalResource.ExternalID)353			logLinks := externalResource.Logs354			assert.Equal(t, 2, len(logLinks))355			assert.Equal(t, fmt.Sprintf("Kubernetes Logs #0-%d", i), logLinks[0].Name)356			assert.Equal(t, fmt.Sprintf("k8s/log/a-n-b/notfound-%d/pod?namespace=a-n-b", i), logLinks[0].Uri)357			assert.Equal(t, fmt.Sprintf("Cloudwatch Logs #0-%d", i), logLinks[1].Name)358			assert.Equal(t, fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.notfound-%d;streamFilter=typeLogStreamPrefix", i), logLinks[1].Uri)359		}360	})361	t.Run("RunningRetryableFailures", func(t *testing.T) {362		// initialize metadata363		kubeClient := mocks.KubeClient{}364		kubeClient.OnGetClient().Return(failureFakeKubeClient)365		kubeClient.OnGetCache().Return(failureFakeKubeCache)366		resourceManager := mocks.ResourceManager{}367		resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)368		tCtx := getMockTaskExecutionContext(ctx, 0)369		tCtx.OnResourceManager().Return(&resourceManager)370		detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))371		for i := 0; i < subtaskCount; i++ {372			detailed.SetItem(i, bitarray.Item(core.PhaseRunning)) // set all tasks to core.PhaseRunning373		}374		retryAttemptsArray, err := bitarray.NewCompactArray(uint(subtaskCount), bitarray.Item(1))375		assert.NoError(t, err)376		currentState := &arrayCore.State{377			CurrentPhase:         arrayCore.PhaseCheckingSubTaskExecutions,378			ExecutionArraySize:   subtaskCount,379			OriginalArraySize:    int64(subtaskCount),380			OriginalMinSuccesses: int64(subtaskCount),381			ArrayStatus: arraystatus.ArrayStatus{382				Detailed: detailed,383			},384			IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached385			RetryAttempts:  retryAttemptsArray,386		}387		// execute388		newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, tCtx.DataStore(), "/prefix/", "/prefix-sand/", currentState)389		// validate results390		assert.Nil(t, err)391		p, _ := newState.GetPhase()392		assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())393		resourceManager.AssertNumberOfCalls(t, "ReleaseResource", subtaskCount)394		for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {395			assert.Equal(t, core.PhaseRetryableFailure, core.Phases[subtaskPhaseIndex])396		}397	})398	t.Run("RunningPermanentFailures", func(t *testing.T) {399		// initialize metadata400		kubeClient := mocks.KubeClient{}401		kubeClient.OnGetClient().Return(failureFakeKubeClient)402		kubeClient.OnGetCache().Return(failureFakeKubeCache)403		resourceManager := mocks.ResourceManager{}404		resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)405		tCtx := getMockTaskExecutionContext(ctx, 0)406		tCtx.OnResourceManager().Return(&resourceManager)407		detailed := arrayCore.NewPhasesCompactArray(uint(subtaskCount))408		for i := 0; i < subtaskCount; i++ {409			detailed.SetItem(i, bitarray.Item(core.PhaseRunning)) // set all tasks to core.PhaseRunning410		}411		retryAttemptsArray, err := bitarray.NewCompactArray(uint(subtaskCount), bitarray.Item(1))412		assert.NoError(t, err)413		for i := 0; i < subtaskCount; i++ {414			retryAttemptsArray.SetItem(i, bitarray.Item(1))415		}416		currentState := &arrayCore.State{417			CurrentPhase:         arrayCore.PhaseCheckingSubTaskExecutions,418			ExecutionArraySize:   subtaskCount,419			OriginalArraySize:    int64(subtaskCount),420			OriginalMinSuccesses: int64(subtaskCount),421			ArrayStatus: arraystatus.ArrayStatus{422				Detailed: detailed,423			},424			IndexesToCache: arrayCore.InvertBitSet(bitarray.NewBitSet(uint(subtaskCount)), uint(subtaskCount)), // set all tasks to be cached425			RetryAttempts:  retryAttemptsArray,426		}427		// execute428		newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, tCtx.DataStore(), "/prefix/", "/prefix-sand/", currentState)429		// validate results430		assert.Nil(t, err)431		p, _ := newState.GetPhase()432		assert.Equal(t, arrayCore.PhaseWriteToDiscoveryThenFail.String(), p.String())433		resourceManager.AssertNumberOfCalls(t, "ReleaseResource", subtaskCount)434		for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {435			assert.Equal(t, core.PhasePermanentFailure, core.Phases[subtaskPhaseIndex])436		}437	})438}...

Full Screen

Full Screen

management.go

Source:management.go Github

copy

Full Screen

1package k8s2import (3	"context"4	"fmt"5	"time"6	idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"7	"github.com/flyteorg/flyteplugins/go/tasks/errors"8	"github.com/flyteorg/flyteplugins/go/tasks/logs"9	"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"10	"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"11	"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"12	"github.com/flyteorg/flyteplugins/go/tasks/plugins/array"13	"github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"14	arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core"15	"github.com/flyteorg/flyteplugins/go/tasks/plugins/array/errorcollector"16	"github.com/flyteorg/flytestdlib/bitarray"17	"github.com/flyteorg/flytestdlib/logger"18	"github.com/flyteorg/flytestdlib/storage"19)20// allocateResource attempts to allot resources for the specified parameter with the21// TaskExecutionContexts ResourceManager.22func allocateResource(ctx context.Context, tCtx core.TaskExecutionContext, config *Config, podName string) (core.AllocationStatus, error) {23	if !IsResourceConfigSet(config.ResourceConfig) {24		return core.AllocationStatusGranted, nil25	}26	resourceNamespace := core.ResourceNamespace(config.ResourceConfig.PrimaryLabel)27	resourceConstraintSpec := core.ResourceConstraintsSpec{28		ProjectScopeResourceConstraint:   nil,29		NamespaceScopeResourceConstraint: nil,30	}31	allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespace, podName, resourceConstraintSpec)32	if err != nil {33		return core.AllocationUndefined, err34	}35	return allocationStatus, nil36}37// deallocateResource attempts to release resources for the specified parameter with the38// TaskExecutionContexts ResourceManager.39func deallocateResource(ctx context.Context, tCtx core.TaskExecutionContext, config *Config, podName string) error {40	if !IsResourceConfigSet(config.ResourceConfig) {41		return nil42	}43	resourceNamespace := core.ResourceNamespace(config.ResourceConfig.PrimaryLabel)44	err := tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespace, podName)45	if err != nil {46		logger.Errorf(ctx, "Error releasing token [%s]. error %s", podName, err)47		return err48	}49	return nil50}51// LaunchAndCheckSubTasksState iterates over each subtask performing operations to transition them52// to a terminal state. This may include creating new k8s resources, monitoring existing k8s53// resources, retrying failed attempts, or declaring a permanent failure among others.54func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient,55	config *Config, dataStore *storage.DataStore, outputPrefix, baseOutputDataSandbox storage.DataReference, currentState *arrayCore.State) (56	newState *arrayCore.State, externalResources []*core.ExternalResource, err error) {57	if int64(currentState.GetExecutionArraySize()) > config.MaxArrayJobSize {58		ee := fmt.Errorf("array size > max allowed. Requested [%v]. Allowed [%v]", currentState.GetExecutionArraySize(), config.MaxArrayJobSize)59		logger.Info(ctx, ee)60		currentState = currentState.SetPhase(arrayCore.PhasePermanentFailure, 0).SetReason(ee.Error())61		return currentState, externalResources, nil62	}63	newState = currentState64	messageCollector := errorcollector.NewErrorMessageCollector()65	newArrayStatus := &arraystatus.ArrayStatus{66		Summary:  arraystatus.ArraySummary{},67		Detailed: arrayCore.NewPhasesCompactArray(uint(currentState.GetExecutionArraySize())),68	}69	externalResources = make([]*core.ExternalResource, 0, len(currentState.GetArrayStatus().Detailed.GetItems()))70	// If we have arrived at this state for the first time then currentState has not been71	// initialized with number of sub tasks.72	if len(currentState.GetArrayStatus().Detailed.GetItems()) == 0 {73		currentState.ArrayStatus = *newArrayStatus74	}75	// If the current State is newly minted then we must initialize RetryAttempts to track how many76	// times each subtask is executed.77	if len(currentState.RetryAttempts.GetItems()) == 0 {78		count := uint(currentState.GetExecutionArraySize())79		maxValue := bitarray.Item(tCtx.TaskExecutionMetadata().GetMaxAttempts())80		retryAttemptsArray, err := bitarray.NewCompactArray(count, maxValue)81		if err != nil {82			logger.Errorf(ctx, "Failed to create attempts compact array with [count: %v, maxValue: %v]", count, maxValue)83			return currentState, externalResources, nil84		}85		// Initialize subtask retryAttempts to 0 so that, in tandem with the podName logic, we86		// maintain backwards compatibility.87		for i := 0; i < currentState.GetExecutionArraySize(); i++ {88			retryAttemptsArray.SetItem(i, 0)89		}90		currentState.RetryAttempts = retryAttemptsArray91	}92	// If the current State is newly minted then we must initialize SystemFailures to track how many93	// times the subtask failed due to system issues, this is necessary to correctly evaluate94	// interruptible subtasks.95	if len(currentState.SystemFailures.GetItems()) == 0 {96		count := uint(currentState.GetExecutionArraySize())97		maxValue := bitarray.Item(tCtx.TaskExecutionMetadata().GetInterruptibleFailureThreshold())98		systemFailuresArray, err := bitarray.NewCompactArray(count, maxValue)99		if err != nil {100			logger.Errorf(ctx, "Failed to create system failures array with [count: %v, maxValue: %v]", count, maxValue)101			return currentState, externalResources, err102		}103		for i := 0; i < currentState.GetExecutionArraySize(); i++ {104			systemFailuresArray.SetItem(i, 0)105		}106		currentState.SystemFailures = systemFailuresArray107	}108	// initialize log plugin109	logPlugin, err := logs.InitializeLogPlugins(&config.LogConfig.Config)110	if err != nil {111		return currentState, externalResources, err112	}113	// identify max parallelism114	taskTemplate, err := tCtx.TaskReader().Read(ctx)115	if err != nil {116		return currentState, externalResources, err117	} else if taskTemplate == nil {118		return currentState, externalResources, errors.Errorf(errors.BadTaskSpecification, "Required value not set, taskTemplate is nil")119	}120	arrayJob, err := arrayCore.ToArrayJob(taskTemplate.GetCustom(), taskTemplate.TaskTypeVersion)121	if err != nil {122		return currentState, externalResources, err123	}124	currentParallelism := 0125	maxParallelism := int(arrayJob.Parallelism)126	currentSubTaskPhaseHash, err := currentState.GetArrayStatus().HashCode()127	if err != nil {128		return currentState, externalResources, err129	}130	for childIdx, existingPhaseIdx := range currentState.GetArrayStatus().Detailed.GetItems() {131		existingPhase := core.Phases[existingPhaseIdx]132		retryAttempt := currentState.RetryAttempts.GetItem(childIdx)133		if existingPhase == core.PhaseRetryableFailure {134			retryAttempt++135			newState.RetryAttempts.SetItem(childIdx, retryAttempt)136		} else if existingPhase.IsTerminal() {137			newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(existingPhase))138			continue139		}140		originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache())141		systemFailures := currentState.SystemFailures.GetItem(childIdx)142		stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, childIdx, originalIdx, retryAttempt, systemFailures)143		if err != nil {144			return currentState, externalResources, err145		}146		podName := stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()147		// depending on the existing subtask phase we either a launch new k8s resource or monitor148		// an existing instance149		var phaseInfo core.PhaseInfo150		var perr error151		if existingPhase == core.PhaseUndefined || existingPhase == core.PhaseWaitingForResources || existingPhase == core.PhaseRetryableFailure {152			// attempt to allocateResource153			allocationStatus, err := allocateResource(ctx, stCtx, config, podName)154			if err != nil {155				logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s",156					stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), podName, err)157				return currentState, externalResources, err158			}159			logger.Infof(ctx, "Allocation result for [%s] is [%s]", podName, allocationStatus)160			if allocationStatus != core.AllocationStatusGranted {161				phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "Exceeded ResourceManager quota", nil)162			} else {163				phaseInfo, perr = launchSubtask(ctx, stCtx, config, kubeClient)164				// if launchSubtask fails we attempt to deallocate the (previously allocated)165				// resource to mitigate leaks166				if perr != nil {167					perr = deallocateResource(ctx, stCtx, config, podName)168					if perr != nil {169						logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", podName, err)170					}171				}172			}173		} else {174			phaseInfo, perr = getSubtaskPhaseInfo(ctx, stCtx, config, kubeClient, logPlugin)175		}176		if perr != nil {177			return currentState, externalResources, perr178		}179		if phaseInfo.Err() != nil {180			messageCollector.Collect(childIdx, phaseInfo.Err().String())181			// If the service reported an error but there is no error.pb written, write one with the182			// service-provided error message.183			or, err := array.ConstructOutputReader(ctx, dataStore, outputPrefix, baseOutputDataSandbox, originalIdx)184			if err != nil {185				return currentState, externalResources, err186			}187			if hasErr, err := or.IsError(ctx); err != nil {188				return currentState, externalResources, err189			} else if !hasErr {190				// The subtask has not produced an error.pb, write one.191				ow, err := array.ConstructOutputWriter(ctx, dataStore, outputPrefix, baseOutputDataSandbox, originalIdx)192				if err != nil {193					return currentState, externalResources, err194				}195				if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{196					ExecutionError: phaseInfo.Err(),197					IsRecoverable:  phaseInfo.Phase() != core.PhasePermanentFailure,198				})); err != nil {199					return currentState, externalResources, err200				}201			}202		}203		if phaseInfo.Err() != nil && phaseInfo.Err().GetKind() == idlCore.ExecutionError_SYSTEM {204			newState.SystemFailures.SetItem(childIdx, systemFailures+1)205		} else {206			newState.SystemFailures.SetItem(childIdx, systemFailures)207		}208		// process subtask phase209		actualPhase := phaseInfo.Phase()210		if actualPhase.IsSuccess() {211			actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, baseOutputDataSandbox, childIdx, originalIdx)212			if err != nil {213				return currentState, externalResources, err214			}215		}216		if actualPhase == core.PhaseRetryableFailure && uint32(retryAttempt+1) >= stCtx.TaskExecutionMetadata().GetMaxAttempts() {217			// If we see a retryable failure we must check if the number of retries exceeds the maximum218			// attempts. If so, transition to a permanent failure so that is not attempted again.219			actualPhase = core.PhasePermanentFailure220		}221		newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(actualPhase))222		if actualPhase.IsTerminal() {223			err = deallocateResource(ctx, stCtx, config, podName)224			if err != nil {225				logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", podName, err)226				return currentState, externalResources, err227			}228			err = finalizeSubtask(ctx, stCtx, config, kubeClient)229			if err != nil {230				logger.Errorf(ctx, "Error finalizing resource [%s] in Finalize [%s]", podName, err)231				return currentState, externalResources, err232			}233		}234		// process phaseInfo235		var logLinks []*idlCore.TaskLog236		if phaseInfo.Info() != nil {237			logLinks = phaseInfo.Info().Logs238		}239		externalResources = append(externalResources, &core.ExternalResource{240			ExternalID:   podName,241			Index:        uint32(originalIdx),242			Logs:         logLinks,243			RetryAttempt: uint32(retryAttempt),244			Phase:        actualPhase,245		})246		// validate parallelism247		if !actualPhase.IsTerminal() || actualPhase == core.PhaseRetryableFailure {248			currentParallelism++249		}250		if maxParallelism != 0 && currentParallelism >= maxParallelism {251			break252		}253	}254	// compute task phase from array status summary255	for _, phaseIdx := range newArrayStatus.Detailed.GetItems() {256		newArrayStatus.Summary.Inc(core.Phases[phaseIdx])257	}258	phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary)259	// process new state260	newState = newState.SetArrayStatus(*newArrayStatus)261	if phase == arrayCore.PhaseWriteToDiscoveryThenFail {262		errorMsg := messageCollector.Summary(GetConfig().MaxErrorStringLength)263		newState = newState.SetReason(errorMsg)264	}265	_, version := currentState.GetPhase()266	if phase == arrayCore.PhaseCheckingSubTaskExecutions {267		newSubTaskPhaseHash, err := newState.GetArrayStatus().HashCode()268		if err != nil {269			return currentState, externalResources, err270		}271		if newSubTaskPhaseHash != currentSubTaskPhaseHash {272			version++273		}274		newState = newState.SetPhase(phase, version).SetReason("Task is still running")275	} else {276		newState = newState.SetPhase(phase, version)277	}278	return newState, externalResources, nil279}280// TerminateSubTasks performs operations to gracefully terminate all subtasks. This may include281// aborting and finalizing active k8s resources.282func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, config *Config,283	terminateFunction func(context.Context, SubTaskExecutionContext, *Config, core.KubeClient) error, currentState *arrayCore.State) error {284	taskTemplate, err := tCtx.TaskReader().Read(ctx)285	if err != nil {286		return err287	} else if taskTemplate == nil {288		return errors.Errorf(errors.BadTaskSpecification, "Required value not set, taskTemplate is nil")289	}290	messageCollector := errorcollector.NewErrorMessageCollector()291	for childIdx, existingPhaseIdx := range currentState.GetArrayStatus().Detailed.GetItems() {292		existingPhase := core.Phases[existingPhaseIdx]293		retryAttempt := uint64(0)294		if childIdx < len(currentState.RetryAttempts.GetItems()) {295			// we can use RetryAttempts if it has been initialized, otherwise stay with default 0296			retryAttempt = currentState.RetryAttempts.GetItem(childIdx)297		}298		// return immediately if subtask has completed or not yet started299		if existingPhase.IsTerminal() || existingPhase == core.PhaseUndefined {300			continue301		}302		originalIdx := arrayCore.CalculateOriginalIndex(childIdx, currentState.GetIndexesToCache())303		stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, childIdx, originalIdx, retryAttempt, 0)304		if err != nil {305			return err306		}307		err = terminateFunction(ctx, stCtx, config, kubeClient)308		if err != nil {309			messageCollector.Collect(childIdx, err.Error())310		}311	}312	if messageCollector.Length() > 0 {313		return fmt.Errorf(messageCollector.Summary(config.MaxErrorStringLength))314	}315	return nil316}...

Full Screen

Full Screen

zz_generated.managedlist.go

Source:zz_generated.managedlist.go Github

copy

Full Screen

1/*2Copyright 2021 The Crossplane Authors.3Licensed under the Apache License, Version 2.0 (the "License");4you may not use this file except in compliance with the License.5You may obtain a copy of the License at6    http://www.apache.org/licenses/LICENSE-2.07Unless required by applicable law or agreed to in writing, software8distributed under the License is distributed on an "AS IS" BASIS,9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10See the License for the specific language governing permissions and11limitations under the License.12*/13// Code generated by angryjet. DO NOT EDIT.14package v1alpha115import resource "github.com/crossplane/crossplane-runtime/pkg/resource"16// GetItems of this BindingsList.17func (l *BindingsList) GetItems() []resource.Managed {18	items := make([]resource.Managed, len(l.Items))19	for i := range l.Items {20		items[i] = &l.Items[i]21	}22	return items23}24// GetItems of this ExecutionConfigList.25func (l *ExecutionConfigList) GetItems() []resource.Managed {26	items := make([]resource.Managed, len(l.Items))27	for i := range l.Items {28		items[i] = &l.Items[i]29	}30	return items31}32// GetItems of this ExecutionList.33func (l *ExecutionList) GetItems() []resource.Managed {34	items := make([]resource.Managed, len(l.Items))35	for i := range l.Items {36		items[i] = &l.Items[i]37	}38	return items39}40// GetItems of this FlowList.41func (l *FlowList) GetItems() []resource.Managed {42	items := make([]resource.Managed, len(l.Items))43	for i := range l.Items {44		items[i] = &l.Items[i]45	}46	return items47}48// GetItems of this SubflowList.49func (l *SubflowList) GetItems() []resource.Managed {50	items := make([]resource.Managed, len(l.Items))51	for i := range l.Items {52		items[i] = &l.Items[i]53	}54	return items55}...

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2func main() {3    wg.Add(1)4    go func() {5        defer wg.Done()6        fmt.Println("Hello")7    }()8    wg.Wait()9}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2type Execution struct {3}4type Item struct {5}6func getItems() []Item {7    item1 := Item{ItemId: 1, ItemName: "Item1", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId: 1}8    item2 := Item{ItemId: 2, ItemName: "Item2", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId: 1}9    item3 := Item{ItemId: 3, ItemName: "Item3", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId: 1}10    item4 := Item{ItemId: 4, ItemName: "Item4", ItemStatus: "Running", ItemDuration: "00:00:00", ItemStartDate: "2016-04-20", ItemStartTime: "09:00:00", ItemEndDate: "2016-04-20", ItemEndTime: "09:00:00", ItemExecutionId:

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2type execution struct {3}4func (e execution) getItems() []string {5	return []string{"item1", "item2", "item3"}6}7func main() {8	e := execution{}9	method := reflect.ValueOf(e).MethodByName("getItems")10	name := runtime.FuncForPC(method.Pointer()).Name()11	className := strings.Split(name, ".")[0]12	t := reflect.TypeOf(e)13	methodType := t.MethodByName("getItems").Type14	argumentCount := methodType.NumIn()15	returnCount := methodType.NumOut()16	fmt.Println("Class Name:", className)17	fmt.Println("Method Name:", "getItems")18	fmt.Println("Argument Count:", argumentCount)19	fmt.Println("Return Count:", returnCount)20}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3    fmt.Println("Hello, playground")4    exec.getItems()5}6import "fmt"7type execution struct {8}9func (e execution) getItems() {10    fmt.Println("getItems")11    fmt.Println(e.items)12}13import "fmt"14func main() {15    fmt.Println("Hello, playground")16    exec.getItems()17}18import "fmt"19type execution struct {20}21func (e execution) getItems() {22    fmt.Println("getItems")23    fmt.Println(e.items)24}25import "fmt"26func main() {27    fmt.Println("Hello, playground")28    exec.getItems()29}30import "

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3    fmt.Println("Hello, playground")4    execution := new(Execution)5    execution.getItems()6}7import "fmt"8type Execution struct {9}10func (e *Execution) getItems() {11    e.items = []string{"item1", "item2", "item3"}12    fmt.Println(e.items)13}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1func main() {2    var executionObj = execution.GetItems{}3    var items = executionObj.GetItems()4    fmt.Println(items)5}6type GetItems struct {}7func (getItems GetItems) GetItems() []string {8    items := []string{"item1", "item2"}9}

Full Screen

Full Screen

getItems

Using AI Code Generation

copy

Full Screen

1import (2var tmpl = template.Must(template.ParseGlob("form/*"))3func Index(w http.ResponseWriter, r *http.Request) {4    tmpl.ExecuteTemplate(w, "Index", nil)5}6func getItems(w http.ResponseWriter, r *http.Request) {7    items := execution.GetItems()8    tmpl.ExecuteTemplate(w, "getItems", items)9}10func main() {11    router := mux.NewRouter()12    router.HandleFunc("/", Index)13    router.HandleFunc("/getItems", getItems).Methods("GET")14    http.ListenAndServe(":8000", router)15}16{{define "getItems"}}17        {{range .}}18            <td>{{.ID}}</td>19            <td>{{.Name}}</td>20            <td>{{.Price}}</td>21        {{end}}22{{end}}23{{define "Index"}}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Gauge automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used method in

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful