How to use Reconcile method of event Package

Best Testkube code snippet using event.Reconcile

worker.go

Source:worker.go Github

copy

Full Screen

...69}70// processWorkItem processes one work item according to its type71func (w *worker) processItem(item interface{}) error {72 switch item.(type) {73 case *ReconcileChi:74 reconcile, _ := item.(*ReconcileChi)75 switch reconcile.cmd {76 case reconcileAdd:77 return w.updateCHI(nil, reconcile.new)78 case reconcileUpdate:79 return w.updateCHI(reconcile.old, reconcile.new)80 case reconcileDelete:81 return w.deleteCHI(reconcile.old)82 }83 // Unknown item type, don't know what to do with it84 // Just skip it and behave like it never existed85 utilruntime.HandleError(fmt.Errorf("unexpected reconcile - %#v", reconcile))86 return nil87 case *ReconcileChit:88 reconcile, _ := item.(*ReconcileChit)89 switch reconcile.cmd {90 case reconcileAdd:91 return w.c.addChit(reconcile.new)92 case reconcileUpdate:93 return w.c.updateChit(reconcile.old, reconcile.new)94 case reconcileDelete:95 return w.c.deleteChit(reconcile.old)96 }97 // Unknown item type, don't know what to do with it98 // Just skip it and behave like it never existed99 utilruntime.HandleError(fmt.Errorf("unexpected reconcile - %#v", reconcile))100 return nil101 case *ReconcileChopConfig:102 reconcile, _ := item.(*ReconcileChopConfig)103 switch reconcile.cmd {104 case reconcileAdd:105 return w.c.addChopConfig(reconcile.new)106 case reconcileUpdate:107 return w.c.updateChopConfig(reconcile.old, reconcile.new)108 case reconcileDelete:109 return w.c.deleteChopConfig(reconcile.old)110 }111 // Unknown item type, don't know what to do with it112 // Just skip it and behave like it never existed113 utilruntime.HandleError(fmt.Errorf("unexpected reconcile - %#v", reconcile))114 return nil115 case *DropDns:116 drop, _ := item.(*DropDns)117 if chi, err := w.createCHIFromObjectMeta(drop.initiator); err == nil {118 w.a.V(2).Info("endpointsInformer UpdateFunc(%s/%s) flushing DNS for CHI %s", drop.initiator.Namespace, drop.initiator.Name, chi.Name)119 _ = w.schemer.CHIDropDnsCache(chi)120 } else {121 w.a.Error("endpointsInformer UpdateFunc(%s/%s) unable to find CHI by %v", drop.initiator.Namespace, drop.initiator.Name, drop.initiator.Labels)122 }123 return nil124 }125 // Unknown item type, don't know what to do with it126 // Just skip it and behave like it never existed127 utilruntime.HandleError(fmt.Errorf("unexpected item in the queue - %#v", item))128 return nil129}130func (w *worker) normalize(chi *chop.ClickHouseInstallation) *chop.ClickHouseInstallation {131 var withDefaultCluster bool132 if chi == nil {133 chi = &chop.ClickHouseInstallation{}134 withDefaultCluster = false135 } else {136 withDefaultCluster = true137 }138 chi, err := w.normalizer.CreateTemplatedCHI(chi, withDefaultCluster)139 if err != nil {140 w.a.WithEvent(chi, eventActionReconcile, eventReasonReconcileFailed).141 WithStatusError(chi).142 Error("FAILED to normalize CHI : %v", err)143 }144 return chi145}146// updateCHI sync CHI which was already created earlier147func (w *worker) updateCHI(old, new *chop.ClickHouseInstallation) error {148 if (old != nil) && (new != nil) && (old.ObjectMeta.ResourceVersion == new.ObjectMeta.ResourceVersion) {149 w.a.V(2).Info("updateCHI(%s/%s): ResourceVersion did not change: %s", new.Namespace, new.Name, new.ObjectMeta.ResourceVersion)150 // No need to react151 return nil152 }153 old = w.normalize(old)154 new = w.normalize(new)155 actionPlan := NewActionPlan(old, new)156 if !actionPlan.HasActionsToDo() {157 // Nothing to do - no changes found - no need to react158 w.a.V(2).Info("updateCHI(%s/%s) - no changes found", new.Namespace, new.Name)159 return nil160 }161 // Write desired normalized CHI with initialized .Status, so it would be possible to monitor progress162 (&new.Status).ReconcileStart(actionPlan.GetRemovedHostsNum())163 if err := w.c.updateCHIObjectStatus(new, false); err != nil {164 w.a.V(1).Info("UNABLE to write normalized CHI (%s/%s). It can trigger update action again. Error: %q", new.Namespace, new.Name, err)165 return nil166 }167 w.a.V(1).168 WithEvent(new, eventActionReconcile, eventReasonReconcileStarted).169 WithStatusAction(new).170 Info("updateCHI(%s/%s) reconcile started", new.Namespace, new.Name)171 w.a.V(2).Info("updateCHI(%s/%s) - action plan\n%s\n", new.Namespace, new.Name, actionPlan.String())172 if err := w.reconcile(new); err != nil {173 w.a.WithEvent(new, eventActionReconcile, eventReasonReconcileFailed).174 WithStatusError(new).175 Error("FAILED update: %v", err)176 return nil177 }178 w.a.V(1).179 WithEvent(new, eventActionReconcile, eventReasonReconcileInProgress).180 WithStatusAction(new).181 Info("updateCHI(%s/%s) remove scheduled for deletion items", new.Namespace, new.Name)182 // Remove deleted items183 actionPlan.WalkRemoved(184 func(cluster *chop.ChiCluster) {185 _ = w.deleteCluster(cluster)186 },187 func(shard *chop.ChiShard) {188 _ = w.deleteShard(shard)189 },190 func(host *chop.ChiHost) {191 _ = w.deleteHost(host)192 },193 )194 w.a.V(1).195 WithEvent(new, eventActionReconcile, eventReasonReconcileInProgress).196 WithStatusAction(new).197 Info("updateCHI(%s/%s) update monitoring list", new.Namespace, new.Name)198 w.c.updateWatch(new.Namespace, new.Name, chopmodel.CreatePodFQDNsOfChi(new))199 // Update CHI object200 (&new.Status).ReconcileComplete()201 _ = w.c.updateCHIObjectStatus(new, false)202 w.a.V(1).203 WithEvent(new, eventActionReconcile, eventReasonReconcileCompleted).204 WithStatusActions(new).205 Info("updateCHI(%s/%s) reconcile completed", new.Namespace, new.Name)206 return nil207}208// reconcile reconciles ClickHouseInstallation209func (w *worker) reconcile(chi *chop.ClickHouseInstallation) error {210 w.creator = chopmodel.NewCreator(w.c.chop, chi)211 return chi.WalkTillError(212 w.reconcileCHI,213 w.reconcileCluster,214 w.reconcileShard,215 w.reconcileHost,216 )217}218// reconcileCHI reconciles CHI global objects219func (w *worker) reconcileCHI(chi *chop.ClickHouseInstallation) error {220 // 1. CHI Service221 service := w.creator.CreateServiceCHI()222 if err := w.reconcileService(chi, service); err != nil {223 w.a.WithEvent(chi, eventActionReconcile, eventReasonReconcileFailed).224 WithStatusAction(chi).225 WithStatusError(chi).226 Error("Reconcile CHI %s failed to reconcile Service %s", chi.Name, service.Name)227 return err228 }229 // 2. CHI ConfigMaps230 // ConfigMap common for all resources in CHI231 // contains several sections, mapped as separated chopConfig files,232 // such as remote servers, zookeeper setup, etc233 configMapCommon := w.creator.CreateConfigMapCHICommon()234 if err := w.reconcileConfigMap(chi, configMapCommon); err != nil {235 w.a.WithEvent(chi, eventActionReconcile, eventReasonReconcileFailed).236 WithStatusAction(chi).237 WithStatusError(chi).238 Error("Reconcile CHI %s failed to reconcile ConfigMap %s", chi.Name, configMapCommon.Name)239 return err240 }241 // ConfigMap common for all users resources in CHI242 configMapUsers := w.creator.CreateConfigMapCHICommonUsers()243 if err := w.reconcileConfigMap(chi, configMapUsers); err != nil {244 w.a.WithEvent(chi, eventActionReconcile, eventReasonReconcileFailed).245 WithStatusAction(chi).246 WithStatusError(chi).247 Error("Reconcile CHI %s failed to reconcile ConfigMap %s", chi.Name, configMapUsers.Name)248 return err249 }250 // Add here other CHI components to be reconciled251 return nil252}253// reconcileCluster reconciles Cluster, excluding nested shards254func (w *worker) reconcileCluster(cluster *chop.ChiCluster) error {255 // Add Cluster's Service256 service := w.creator.CreateServiceCluster(cluster)257 if service == nil {258 // TODO259 // For somewhat reason Service is not created, this is an error, but not clear what to do about it260 return nil261 }262 return w.reconcileService(cluster.CHI, service)263}264// reconcileShard reconciles Shard, excluding nested replicas265func (w *worker) reconcileShard(shard *chop.ChiShard) error {266 // Add Shard's Service267 service := w.creator.CreateServiceShard(shard)268 if service == nil {269 // TODO270 // For somewhat reason Service is not created, this is an error, but not clear what to do about it271 return nil272 }273 return w.reconcileService(shard.CHI, service)274}275// reconcileHost reconciles ClickHouse host276func (w *worker) reconcileHost(host *chop.ChiHost) error {277 w.a.V(1).278 WithEvent(host.CHI, eventActionReconcile, eventReasonReconcileStarted).279 WithStatusAction(host.CHI).280 Info("Reconcile Host %s started", host.Name)281 // Add host's ConfigMap282 configMap := w.creator.CreateConfigMapHost(host)283 if err := w.reconcileConfigMap(host.CHI, configMap); err != nil {284 w.a.WithEvent(host.CHI, eventActionReconcile, eventReasonReconcileFailed).285 WithStatusAction(host.CHI).286 WithStatusError(host.CHI).287 Error("Reconcile Host %s failed to reconcile ConfigMap %s", host.Name, configMap.Name)288 return err289 }290 // Add host's StatefulSet291 statefulSet := w.creator.CreateStatefulSet(host)292 curStatefulSet, _ := w.c.getStatefulSet(&statefulSet.ObjectMeta, false)293 if err := w.reconcileStatefulSet(statefulSet, host); err != nil {294 w.a.WithEvent(host.CHI, eventActionReconcile, eventReasonReconcileFailed).295 WithStatusAction(host.CHI).296 WithStatusError(host.CHI).297 Error("Reconcile Host %s failed to reconcile StatefulSet %s", host.Name, statefulSet.Name)298 return err299 }300 // Add host's Service301 service := w.creator.CreateServiceHost(host)302 if err := w.reconcileService(host.CHI, service); err != nil {303 w.a.WithEvent(host.CHI, eventActionReconcile, eventReasonReconcileFailed).304 WithStatusAction(host.CHI).305 WithStatusError(host.CHI).306 Error("Reconcile Host %s failed to reconcile Service %s", host.Name, service.Name)307 return err308 }309 w.a.V(1).310 WithEvent(host.CHI, eventActionReconcile, eventReasonReconcileCompleted).311 WithStatusAction(host.CHI).312 Info("Reconcile Host %s completed", host.Name)313 // Create Tables on a Host if new stateful set is created314 if curStatefulSet == nil {315 err := w.schemer.HostCreateTables(host)316 if err == nil {317 w.a.V(1).318 WithEvent(host.CHI, eventActionCreate, eventReasonCreateCompleted).319 WithStatusAction(host.CHI).320 Info("Created schema objects on host %s replica %d to shard %d in cluster %s",321 host.Name, host.Address.ReplicaIndex, host.Address.ShardIndex, host.Address.ClusterName)322 } else {323 w.a.WithEvent(host.CHI, eventActionCreate, eventReasonCreateFailed).324 WithStatusError(host.CHI).325 Error("FAILED to create schema objects on host %s with error %v", host.Name, err)326 }...

Full Screen

Full Screen

reconcile_test.go

Source:reconcile_test.go Github

copy

Full Screen

...29 "sigs.k8s.io/controller-runtime/pkg/client"30 fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"31 "sigs.k8s.io/controller-runtime/pkg/reconcile"32)33func TestReconcile(t *testing.T) {34 gvk := schema.GroupVersionKind{35 Kind: "Testing",36 Group: "operator-sdk",37 Version: "v1beta1",38 }39 eventTime := time.Now()40 testCases := []struct {41 Name string42 GVK schema.GroupVersionKind43 ReconcilePeriod time.Duration44 Runner runner.Runner45 EventHandlers []events.EventHandler46 Client client.Client47 ExpectedObject *unstructured.Unstructured48 Result reconcile.Result49 Request reconcile.Request50 ShouldError bool51 ManageStatus bool52 }{53 {54 Name: "cr not found",55 GVK: gvk,56 ReconcilePeriod: 5 * time.Second,57 Runner: &fake.Runner{58 JobEvents: []eventapi.JobEvent{},59 },60 Client: fakeclient.NewFakeClient(),61 Result: reconcile.Result{},62 Request: reconcile.Request{63 NamespacedName: types.NamespacedName{64 Name: "not_found",65 Namespace: "default",66 },67 },68 },69 {70 Name: "completed reconcile",71 GVK: gvk,72 ReconcilePeriod: 5 * time.Second,73 ManageStatus: true,74 Runner: &fake.Runner{75 JobEvents: []eventapi.JobEvent{76 eventapi.JobEvent{77 Event: eventapi.EventPlaybookOnStats,78 Created: eventapi.EventTime{Time: eventTime},79 },80 },81 },82 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{83 Object: map[string]interface{}{84 "metadata": map[string]interface{}{85 "name": "reconcile",86 "namespace": "default",87 },88 "apiVersion": "operator-sdk/v1beta1",89 "kind": "Testing",90 },91 }),92 Result: reconcile.Result{93 RequeueAfter: 5 * time.Second,94 },95 Request: reconcile.Request{96 NamespacedName: types.NamespacedName{97 Name: "reconcile",98 Namespace: "default",99 },100 },101 ExpectedObject: &unstructured.Unstructured{102 Object: map[string]interface{}{103 "metadata": map[string]interface{}{104 "name": "reconcile",105 "namespace": "default",106 },107 "apiVersion": "operator-sdk/v1beta1",108 "kind": "Testing",109 "spec": map[string]interface{}{},110 "status": map[string]interface{}{111 "conditions": []interface{}{112 map[string]interface{}{113 "status": "True",114 "type": "Running",115 "ansibleResult": map[string]interface{}{116 "changed": int64(0),117 "failures": int64(0),118 "ok": int64(0),119 "skipped": int64(0),120 "completion": eventTime.Format("2006-01-02T15:04:05.99999999"),121 },122 "message": "Awaiting next reconciliation",123 "reason": "Successful",124 },125 },126 },127 },128 },129 },130 {131 Name: "Failure event runner on failed with manageStatus == true",132 GVK: gvk,133 ManageStatus: true,134 Runner: &fake.Runner{135 JobEvents: []eventapi.JobEvent{136 eventapi.JobEvent{137 Event: eventapi.EventRunnerOnFailed,138 Created: eventapi.EventTime{Time: eventTime},139 EventData: map[string]interface{}{140 "res": map[string]interface{}{141 "msg": "new failure message",142 },143 },144 },145 eventapi.JobEvent{146 Event: eventapi.EventPlaybookOnStats,147 Created: eventapi.EventTime{Time: eventTime},148 },149 },150 },151 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{152 Object: map[string]interface{}{153 "metadata": map[string]interface{}{154 "name": "reconcile",155 "namespace": "default",156 },157 "apiVersion": "operator-sdk/v1beta1",158 "kind": "Testing",159 "spec": map[string]interface{}{},160 },161 }),162 Request: reconcile.Request{163 NamespacedName: types.NamespacedName{164 Name: "reconcile",165 Namespace: "default",166 },167 },168 ExpectedObject: &unstructured.Unstructured{169 Object: map[string]interface{}{170 "metadata": map[string]interface{}{171 "name": "reconcile",172 "namespace": "default",173 },174 "apiVersion": "operator-sdk/v1beta1",175 "kind": "Testing",176 "spec": map[string]interface{}{},177 "status": map[string]interface{}{178 "conditions": []interface{}{179 map[string]interface{}{180 "status": "False",181 "type": "Running",182 "message": "Running reconciliation",183 "reason": "Running",184 },185 map[string]interface{}{186 "status": "True",187 "type": "Failure",188 "ansibleResult": map[string]interface{}{189 "changed": int64(0),190 "failures": int64(0),191 "ok": int64(0),192 "skipped": int64(0),193 "completion": eventTime.Format("2006-01-02T15:04:05.99999999"),194 },195 "message": "new failure message",196 "reason": "Failed",197 },198 },199 },200 },201 },202 ShouldError: true,203 },204 {205 Name: "Failure event runner on failed",206 GVK: gvk,207 ManageStatus: false,208 Runner: &fake.Runner{209 JobEvents: []eventapi.JobEvent{210 eventapi.JobEvent{211 Event: eventapi.EventRunnerOnFailed,212 Created: eventapi.EventTime{Time: eventTime},213 EventData: map[string]interface{}{214 "res": map[string]interface{}{215 "msg": "new failure message",216 },217 },218 },219 eventapi.JobEvent{220 Event: eventapi.EventPlaybookOnStats,221 Created: eventapi.EventTime{Time: eventTime},222 },223 },224 },225 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{226 Object: map[string]interface{}{227 "metadata": map[string]interface{}{228 "name": "reconcile",229 "namespace": "default",230 },231 "apiVersion": "operator-sdk/v1beta1",232 "kind": "Testing",233 "spec": map[string]interface{}{},234 },235 }),236 Request: reconcile.Request{237 NamespacedName: types.NamespacedName{238 Name: "reconcile",239 Namespace: "default",240 },241 },242 ShouldError: true,243 },244 {245 Name: "Finalizer successful reconcile",246 GVK: gvk,247 ReconcilePeriod: 5 * time.Second,248 ManageStatus: true,249 Runner: &fake.Runner{250 JobEvents: []eventapi.JobEvent{251 eventapi.JobEvent{252 Event: eventapi.EventPlaybookOnStats,253 Created: eventapi.EventTime{Time: eventTime},254 },255 },256 Finalizer: "testing.io",257 },258 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{259 Object: map[string]interface{}{260 "metadata": map[string]interface{}{261 "name": "reconcile",262 "namespace": "default",263 "annotations": map[string]interface{}{264 controller.ReconcilePeriodAnnotation: "3s",265 },266 },267 "apiVersion": "operator-sdk/v1beta1",268 "kind": "Testing",269 "spec": map[string]interface{}{},270 },271 }),272 Result: reconcile.Result{273 RequeueAfter: 3 * time.Second,274 },275 Request: reconcile.Request{276 NamespacedName: types.NamespacedName{277 Name: "reconcile",278 Namespace: "default",279 },280 },281 ExpectedObject: &unstructured.Unstructured{282 Object: map[string]interface{}{283 "metadata": map[string]interface{}{284 "name": "reconcile",285 "namespace": "default",286 "annotations": map[string]interface{}{287 controller.ReconcilePeriodAnnotation: "3s",288 },289 "finalizers": []interface{}{290 "testing.io",291 },292 },293 "apiVersion": "operator-sdk/v1beta1",294 "kind": "Testing",295 "spec": map[string]interface{}{},296 "status": map[string]interface{}{297 "conditions": []interface{}{298 map[string]interface{}{299 "status": "True",300 "type": "Running",301 "ansibleResult": map[string]interface{}{302 "changed": int64(0),303 "failures": int64(0),304 "ok": int64(0),305 "skipped": int64(0),306 "completion": eventTime.Format("2006-01-02T15:04:05.99999999"),307 },308 "message": "Awaiting next reconciliation",309 "reason": "Successful",310 },311 },312 },313 },314 },315 },316 {317 Name: "reconcile deletetion",318 GVK: gvk,319 ReconcilePeriod: 5 * time.Second,320 Runner: &fake.Runner{321 JobEvents: []eventapi.JobEvent{322 eventapi.JobEvent{323 Event: eventapi.EventPlaybookOnStats,324 Created: eventapi.EventTime{Time: eventTime},325 },326 },327 Finalizer: "testing.io",328 },329 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{330 Object: map[string]interface{}{331 "metadata": map[string]interface{}{332 "name": "reconcile",333 "namespace": "default",334 "annotations": map[string]interface{}{335 controller.ReconcilePeriodAnnotation: "3s",336 },337 "deletionTimestamp": eventTime.Format(time.RFC3339),338 },339 "apiVersion": "operator-sdk/v1beta1",340 "kind": "Testing",341 "spec": map[string]interface{}{},342 },343 }),344 Result: reconcile.Result{},345 Request: reconcile.Request{346 NamespacedName: types.NamespacedName{347 Name: "reconcile",348 Namespace: "default",349 },350 },351 },352 {353 Name: "Finalizer successful deletion reconcile",354 GVK: gvk,355 ReconcilePeriod: 5 * time.Second,356 ManageStatus: true,357 Runner: &fake.Runner{358 JobEvents: []eventapi.JobEvent{359 eventapi.JobEvent{360 Event: eventapi.EventPlaybookOnStats,361 Created: eventapi.EventTime{Time: eventTime},362 },363 },364 Finalizer: "testing.io",365 },366 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{367 Object: map[string]interface{}{368 "metadata": map[string]interface{}{369 "name": "reconcile",370 "namespace": "default",371 "finalizers": []interface{}{372 "testing.io",373 },374 "deletionTimestamp": eventTime.Format(time.RFC3339),375 },376 "apiVersion": "operator-sdk/v1beta1",377 "kind": "Testing",378 "spec": map[string]interface{}{},379 "status": map[string]interface{}{380 "conditions": []interface{}{381 map[string]interface{}{382 "status": "True",383 "type": "Running",384 "ansibleResult": map[string]interface{}{385 "changed": int64(0),386 "failures": int64(0),387 "ok": int64(0),388 "skipped": int64(0),389 "completion": eventTime.Format("2006-01-02T15:04:05.99999999"),390 },391 "message": "Awaiting next reconciliation",392 "reason": "Successful",393 },394 },395 },396 },397 }),398 Result: reconcile.Result{399 RequeueAfter: 5 * time.Second,400 },401 Request: reconcile.Request{402 NamespacedName: types.NamespacedName{403 Name: "reconcile",404 Namespace: "default",405 },406 },407 ExpectedObject: &unstructured.Unstructured{408 Object: map[string]interface{}{409 "metadata": map[string]interface{}{410 "name": "reconcile",411 "namespace": "default",412 },413 "apiVersion": "operator-sdk/v1beta1",414 "kind": "Testing",415 "spec": map[string]interface{}{},416 "status": map[string]interface{}{417 "conditions": []interface{}{418 map[string]interface{}{419 "status": "True",420 "type": "Running",421 "ansibleResult": map[string]interface{}{422 "changed": int64(0),423 "failures": int64(0),424 "ok": int64(0),425 "skipped": int64(0),426 "completion": eventTime.Format("2006-01-02T15:04:05.99999999"),427 },428 "message": "Awaiting next reconciliation",429 "reason": "Successful",430 },431 },432 },433 },434 },435 },436 {437 Name: "No status event",438 GVK: gvk,439 ReconcilePeriod: 5 * time.Second,440 Runner: &fake.Runner{441 JobEvents: []eventapi.JobEvent{442 eventapi.JobEvent{443 Created: eventapi.EventTime{Time: eventTime},444 },445 },446 },447 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{448 Object: map[string]interface{}{449 "metadata": map[string]interface{}{450 "name": "reconcile",451 "namespace": "default",452 },453 "apiVersion": "operator-sdk/v1beta1",454 "kind": "Testing",455 "spec": map[string]interface{}{},456 "status": map[string]interface{}{457 "conditions": []interface{}{458 map[string]interface{}{459 "status": "True",460 "type": "Running",461 "ansibleResult": map[string]interface{}{462 "changed": int64(0),463 "failures": int64(0),464 "ok": int64(0),465 "skipped": int64(0),466 "completion": eventTime.Format("2006-01-02T15:04:05.99999999"),467 },468 "message": "Failed to get ansible-runner stdout",469 },470 },471 },472 },473 }),474 Result: reconcile.Result{475 RequeueAfter: 5 * time.Second,476 },477 Request: reconcile.Request{478 NamespacedName: types.NamespacedName{479 Name: "reconcile",480 Namespace: "default",481 },482 },483 ShouldError: true,484 },485 {486 Name: "no manage status",487 GVK: gvk,488 ReconcilePeriod: 5 * time.Second,489 ManageStatus: false,490 Runner: &fake.Runner{491 JobEvents: []eventapi.JobEvent{492 eventapi.JobEvent{493 Event: eventapi.EventPlaybookOnStats,494 Created: eventapi.EventTime{Time: eventTime},495 },496 },497 },498 Client: fakeclient.NewFakeClient(&unstructured.Unstructured{499 Object: map[string]interface{}{500 "metadata": map[string]interface{}{501 "name": "reconcile",502 "namespace": "default",503 },504 "apiVersion": "operator-sdk/v1beta1",505 "kind": "Testing",506 },507 }),508 Result: reconcile.Result{509 RequeueAfter: 5 * time.Second,510 },511 Request: reconcile.Request{512 NamespacedName: types.NamespacedName{513 Name: "reconcile",514 Namespace: "default",515 },516 },517 ExpectedObject: &unstructured.Unstructured{518 Object: map[string]interface{}{519 "metadata": map[string]interface{}{520 "name": "reconcile",521 "namespace": "default",522 },523 "apiVersion": "operator-sdk/v1beta1",524 "kind": "Testing",525 "spec": map[string]interface{}{},526 "status": map[string]interface{}{},527 },528 },529 },530 }531 for _, tc := range testCases {532 t.Run(tc.Name, func(t *testing.T) {533 var aor reconcile.Reconciler = &controller.AnsibleOperatorReconciler{534 GVK: tc.GVK,535 Runner: tc.Runner,536 Client: tc.Client,537 APIReader: tc.Client,538 EventHandlers: tc.EventHandlers,539 ReconcilePeriod: tc.ReconcilePeriod,540 ManageStatus: tc.ManageStatus,541 }542 result, err := aor.Reconcile(tc.Request)543 if err != nil && !tc.ShouldError {544 t.Fatalf("Unexpected error: %v", err)545 }546 if !reflect.DeepEqual(result, tc.Result) {547 t.Fatalf("Reconcile result does not equal\nexpected: %#v\nactual: %#v", tc.Result, result)548 }549 if tc.ExpectedObject != nil {550 actualObject := &unstructured.Unstructured{}551 actualObject.SetGroupVersionKind(tc.ExpectedObject.GroupVersionKind())552 err := tc.Client.Get(context.TODO(), types.NamespacedName{553 Name: tc.ExpectedObject.GetName(),554 Namespace: tc.ExpectedObject.GetNamespace(),555 }, actualObject)556 if err != nil {557 t.Fatalf("Failed to get object: (%v)", err)558 }559 if !reflect.DeepEqual(actualObject.GetAnnotations(), tc.ExpectedObject.GetAnnotations()) {560 t.Fatalf("Annotations are not the same\nexpected: %v\nactual: %v",561 tc.ExpectedObject.GetAnnotations(), actualObject.GetAnnotations())...

Full Screen

Full Screen

reconcile.go

Source:reconcile.go Github

copy

Full Screen

...38 logf "sigs.k8s.io/controller-runtime/pkg/log"39 "sigs.k8s.io/controller-runtime/pkg/reconcile"40)41const (42 // ReconcilePeriodAnnotation - annotation used by a user to specify the reconciliation interval for the CR.43 // To use create a CR with an annotation "ansible.operator-sdk/reconcile-period: 30s" or some other valid44 // Duration. This will override the operators/or controllers reconcile period for that particular CR.45 ReconcilePeriodAnnotation = "ansible.operator-sdk/reconcile-period"46)47// AnsibleOperatorReconciler - object to reconcile runner requests48type AnsibleOperatorReconciler struct {49 GVK schema.GroupVersionKind50 Runner runner.Runner51 Client client.Client52 APIReader client.Reader53 EventHandlers []events.EventHandler54 ReconcilePeriod time.Duration55 ManageStatus bool56 AnsibleDebugLogs bool57}58// Reconcile - handle the event.59func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {60 u := &unstructured.Unstructured{}61 u.SetGroupVersionKind(r.GVK)62 err := r.Client.Get(context.TODO(), request.NamespacedName, u)63 if apierrors.IsNotFound(err) {64 return reconcile.Result{}, nil65 }66 if err != nil {67 return reconcile.Result{}, err68 }69 ident := strconv.Itoa(rand.Int())70 logger := logf.Log.WithName("reconciler").WithValues(71 "job", ident,72 "name", u.GetName(),73 "namespace", u.GetNamespace(),74 )75 reconcileResult := reconcile.Result{RequeueAfter: r.ReconcilePeriod}76 if ds, ok := u.GetAnnotations()[ReconcilePeriodAnnotation]; ok {77 duration, err := time.ParseDuration(ds)78 if err != nil {79 // Should attempt to update to a failed condition80 errmark := r.markError(u, request.NamespacedName,81 fmt.Sprintf("Unable to parse reconcile period annotation: %v", err))82 if errmark != nil {83 logger.Error(errmark, "Unable to mark error annotation")84 }85 logger.Error(err, "Unable to parse reconcile period annotation")86 return reconcileResult, err87 }88 reconcileResult.RequeueAfter = duration89 }90 deleted := u.GetDeletionTimestamp() != nil91 finalizer, finalizerExists := r.Runner.GetFinalizer()92 pendingFinalizers := u.GetFinalizers()93 // If the resource is being deleted we don't want to add the finalizer again94 if finalizerExists && !deleted && !contains(pendingFinalizers, finalizer) {95 logger.V(1).Info("Adding finalizer to resource", "Finalizer", finalizer)96 finalizers := append(pendingFinalizers, finalizer)97 u.SetFinalizers(finalizers)98 err := r.Client.Update(context.TODO(), u)99 if err != nil {100 logger.Error(err, "Unable to update cr with finalizer")101 return reconcileResult, err102 }103 }104 if !contains(pendingFinalizers, finalizer) && deleted {105 logger.Info("Resource is terminated, skipping reconciliation")106 return reconcile.Result{}, nil107 }108 spec := u.Object["spec"]109 _, ok := spec.(map[string]interface{})110 // Need to handle cases where there is no spec.111 // We can add the spec to the object, which will allow112 // everything to work, and will not get updated.113 // Therefore we can now deal with the case of secrets and configmaps.114 if !ok {115 logger.V(1).Info("Spec was not found")116 u.Object["spec"] = map[string]interface{}{}117 }118 if r.ManageStatus {119 errmark := r.markRunning(u, request.NamespacedName)120 if errmark != nil {121 logger.Error(errmark, "Unable to update the status to mark cr as running")122 return reconcileResult, errmark123 }124 }125 ownerRef := metav1.OwnerReference{126 APIVersion: u.GetAPIVersion(),127 Kind: u.GetKind(),128 Name: u.GetName(),129 UID: u.GetUID(),130 }131 kc, err := kubeconfig.Create(ownerRef, "http://localhost:8888", u.GetNamespace())132 if err != nil {133 errmark := r.markError(u, request.NamespacedName, "Unable to run reconciliation")134 if errmark != nil {135 logger.Error(errmark, "Unable to mark error to run reconciliation")136 }137 logger.Error(err, "Unable to generate kubeconfig")138 return reconcileResult, err139 }140 defer func() {141 if err := os.Remove(kc.Name()); err != nil {142 logger.Error(err, "Failed to remove generated kubeconfig file")143 }144 }()145 result, err := r.Runner.Run(ident, u, kc.Name())146 if err != nil {147 errmark := r.markError(u, request.NamespacedName, "Unable to run reconciliation")148 if errmark != nil {149 logger.Error(errmark, "Unable to mark error to run reconciliation")150 }151 logger.Error(err, "Unable to run ansible runner")152 return reconcileResult, err153 }154 // iterate events from ansible, looking for the final one155 statusEvent := eventapi.StatusJobEvent{}156 failureMessages := eventapi.FailureMessages{}157 for event := range result.Events() {158 for _, eHandler := range r.EventHandlers {159 go eHandler.Handle(ident, u, event)160 }161 if event.Event == eventapi.EventPlaybookOnStats {162 // convert to StatusJobEvent; would love a better way to do this163 data, err := json.Marshal(event)164 if err != nil {165 printEventStats(statusEvent)166 return reconcile.Result{}, err167 }168 err = json.Unmarshal(data, &statusEvent)169 if err != nil {170 printEventStats(statusEvent)171 return reconcile.Result{}, err172 }173 }174 if event.Event == eventapi.EventRunnerOnFailed && !event.IgnoreError() {175 failureMessages = append(failureMessages, event.GetFailedPlaybookMessage())176 }177 }178 // To print the stats of the task179 printEventStats(statusEvent)180 // To print the full ansible result181 r.printAnsibleResult(result)182 if statusEvent.Event == "" {183 eventErr := errors.New("did not receive playbook_on_stats event")184 stdout, err := result.Stdout()185 if err != nil {186 errmark := r.markError(u, request.NamespacedName, "Failed to get ansible-runner stdout")187 if errmark != nil {188 logger.Error(errmark, "Unable to mark error to run reconciliation")189 }190 logger.Error(err, "Failed to get ansible-runner stdout")191 return reconcileResult, err192 }193 logger.Error(eventErr, stdout)194 return reconcileResult, eventErr195 }196 // Need to get the unstructured object after the Ansible runner finishes.197 // This needs to hit the API server to retrieve updates.198 err = r.APIReader.Get(context.TODO(), request.NamespacedName, u)199 if err != nil {200 if apierrors.IsNotFound(err) {201 return reconcile.Result{}, nil202 }203 return reconcile.Result{}, err204 }205 // try to get the updated finalizers206 pendingFinalizers = u.GetFinalizers()207 // We only want to update the CustomResource once, so we'll track changes208 // and do it at the end209 runSuccessful := len(failureMessages) == 0210 // The finalizer has run successfully, time to remove it211 if deleted && finalizerExists && runSuccessful {212 finalizers := []string{}213 for _, pendingFinalizer := range pendingFinalizers {214 if pendingFinalizer != finalizer {215 finalizers = append(finalizers, pendingFinalizer)216 }217 }218 u.SetFinalizers(finalizers)219 err := r.Client.Update(context.TODO(), u)220 if err != nil {221 logger.Error(err, "Failed to remove finalizer")222 return reconcileResult, err223 }224 }225 if r.ManageStatus {226 errmark := r.markDone(u, request.NamespacedName, statusEvent, failureMessages)227 if errmark != nil {228 logger.Error(errmark, "Failed to mark status done")229 }230 // re-trigger reconcile because of failures231 if !runSuccessful {232 return reconcileResult, errors.New("event runner on failed")233 }234 return reconcileResult, errmark235 }236 // re-trigger reconcile because of failures237 if !runSuccessful {238 return reconcileResult, errors.New("received failed task event")239 }240 return reconcileResult, nil241}242func printEventStats(statusEvent eventapi.StatusJobEvent) {243 if len(statusEvent.StdOut) > 0 {244 fmt.Printf("\n--------------------------- Ansible Task Status Event StdOut -----------------\n")245 fmt.Println(statusEvent.StdOut)246 fmt.Printf("\n-------------------------------------------------------------------------------\n")247 }248}249func (r *AnsibleOperatorReconciler) printAnsibleResult(result runner.RunResult) {250 if r.AnsibleDebugLogs {251 if res, err := result.Stdout(); err == nil && len(res) > 0 {252 fmt.Printf("\n--------------------------- Ansible Debug Result -----------------------------\n")253 fmt.Println(res)254 fmt.Printf("\n-------------------------------------------------------------------------------\n")255 }256 }257}258func (r *AnsibleOperatorReconciler) markRunning(u *unstructured.Unstructured,259 namespacedName types.NamespacedName) error {260 // Get the latest resource to prevent updating a stale status.261 if err := r.APIReader.Get(context.TODO(), namespacedName, u); err != nil {262 return err263 }264 crStatus := getStatus(u)265 // If there is no current status add that we are working on this resource.266 errCond := ansiblestatus.GetCondition(crStatus, ansiblestatus.FailureConditionType)267 if errCond != nil {268 errCond.Status = v1.ConditionFalse269 ansiblestatus.SetCondition(&crStatus, *errCond)270 }271 // If the condition is currently running, making sure that the values are correct.272 // If they are the same a no-op, if they are different then it is a good thing we273 // are updating it.274 c := ansiblestatus.NewCondition(275 ansiblestatus.RunningConditionType,276 v1.ConditionTrue,277 nil,278 ansiblestatus.RunningReason,279 ansiblestatus.RunningMessage,280 )281 ansiblestatus.SetCondition(&crStatus, *c)282 u.Object["status"] = crStatus.GetJSONMap()283 return r.Client.Status().Update(context.TODO(), u)284}285// markError - used to alert the user to the issues during the validation of a reconcile run.286// i.e Annotations that could be incorrect287func (r *AnsibleOperatorReconciler) markError(u *unstructured.Unstructured, namespacedName types.NamespacedName,288 failureMessage string) error {289 logger := logf.Log.WithName("markError")290 // Immediately update metrics with failed reconciliation, since Get()291 // may fail.292 metrics.ReconcileFailed(r.GVK.String())293 // Get the latest resource to prevent updating a stale status.294 if err := r.APIReader.Get(context.TODO(), namespacedName, u); err != nil {295 if apierrors.IsNotFound(err) {296 logger.Info("Resource not found, assuming it was deleted")297 return nil298 }299 return err300 }301 crStatus := getStatus(u)302 sc := ansiblestatus.GetCondition(crStatus, ansiblestatus.RunningConditionType)303 if sc != nil {304 sc.Status = v1.ConditionFalse305 ansiblestatus.SetCondition(&crStatus, *sc)306 }307 c := ansiblestatus.NewCondition(308 ansiblestatus.FailureConditionType,309 v1.ConditionTrue,310 nil,311 ansiblestatus.FailedReason,312 failureMessage,313 )314 ansiblestatus.SetCondition(&crStatus, *c)315 // This needs the status subresource to be enabled by default.316 u.Object["status"] = crStatus.GetJSONMap()317 return r.Client.Status().Update(context.TODO(), u)318}319func (r *AnsibleOperatorReconciler) markDone(u *unstructured.Unstructured, namespacedName types.NamespacedName,320 statusEvent eventapi.StatusJobEvent, failureMessages eventapi.FailureMessages) error {321 logger := logf.Log.WithName("markDone")322 // Get the latest resource to prevent updating a stale status.323 if err := r.APIReader.Get(context.TODO(), namespacedName, u); err != nil {324 if apierrors.IsNotFound(err) {325 logger.Info("Resource not found, assuming it was deleted")326 return nil327 }328 return err329 }330 crStatus := getStatus(u)331 runSuccessful := len(failureMessages) == 0332 ansibleStatus := ansiblestatus.NewAnsibleResultFromStatusJobEvent(statusEvent)333 if !runSuccessful {334 metrics.ReconcileFailed(r.GVK.String())335 sc := ansiblestatus.GetCondition(crStatus, ansiblestatus.RunningConditionType)336 if sc != nil {337 sc.Status = v1.ConditionFalse338 ansiblestatus.SetCondition(&crStatus, *sc)339 }340 c := ansiblestatus.NewCondition(341 ansiblestatus.FailureConditionType,342 v1.ConditionTrue,343 ansibleStatus,344 ansiblestatus.FailedReason,345 strings.Join(failureMessages, "\n"),346 )347 ansiblestatus.SetCondition(&crStatus, *c)348 } else {349 metrics.ReconcileSucceeded(r.GVK.String())350 c := ansiblestatus.NewCondition(351 ansiblestatus.RunningConditionType,352 v1.ConditionTrue,353 ansibleStatus,354 ansiblestatus.SuccessfulReason,355 ansiblestatus.SuccessfulMessage,356 )357 // Remove the failure condition if set, because this completed successfully.358 ansiblestatus.RemoveCondition(&crStatus, ansiblestatus.FailureConditionType)359 ansiblestatus.SetCondition(&crStatus, *c)360 }361 // This needs the status subresource to be enabled by default.362 u.Object["status"] = crStatus.GetJSONMap()363 return r.Client.Status().Update(context.TODO(), u)...

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 watcher, err := fsnotify.NewWatcher()4 if err != nil {5 fmt.Println("Error:", err)6 }7 defer watcher.Close()8 done := make(chan bool)9 go func() {10 for {11 select {12 if !ok {13 }14 if event.Op&fsnotify.Write == fsnotify.Write {15 fmt.Println("modified file:", event.Name)16 }17 if !ok {18 }19 fmt.Println("error:", err)20 }21 }22 }()23 err = watcher.Add("/home/ashish/Documents/GoLang")24 if err != nil {25 fmt.Println("Error:", err)26 }27}

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 watcher, err := fsnotify.NewWatcher()4 if err != nil {5 fmt.Println("ERROR", err)6 }7 defer watcher.Close()8 done := make(chan bool)9 go func() {10 for {11 select {12 if !ok {13 }14 fmt.Println("EVENT!", event)15 if event.Op&fsnotify.Write == fsnotify.Write {16 fmt.Println("modified file:", event.Name)17 }18 if !ok {19 }20 fmt.Println("ERROR", err)21 }22 }23 }()24 err = watcher.Add("/tmp/test.txt")25 if err != nil {26 fmt.Println("ERROR", err)27 }28}29EVENT! {/tmp/test.txt WRITE}

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1 func (r *Reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {2 memcached := &cachev1alpha1.Memcached{}3 err := r.client.Get(context.TODO(), request.NamespacedName, memcached)4 if err != nil {5 if errors.IsNotFound(err) {6 return reconcile.Result{}, nil7 }8 return reconcile.Result{}, err9 }10 found := &appsv1.Deployment{}11 err = r.client.Get(context.TODO(), types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)12 if err != nil && errors.IsNotFound(err) {13 dep := r.deploymentForMemcached(memcached)14 reqLogger := log.WithValues("Request.Namespace", dep.Namespace, "Request.Name", dep.Name)15 reqLogger.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)16 err = r.client.Create(context.TODO(), dep)17 if err != nil {18 return reconcile.Result{}, err19 }20 return reconcile.Result{}, nil21 } else if err != nil {22 return reconcile.Result{}, err23 }24 if *found.Spec.Replicas != size {25 err = r.client.Update(context.TODO(), found)26 if err != nil {27 return reconcile.Result{}, err28 }29 }30 podList := &corev1.PodList{}31 listOpts := []client.ListOption{32 client.InNamespace(memcached.Namespace),33 client.MatchingLabels(labelsForMemcached(memcached.Name)),

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3 Reconcile func()4}5func (e Event) String() string {6 return fmt.Sprintf("Event: %s", e.Name)7}8func main() {9 e := Event{10 Reconcile: func() {11 fmt.Println("Reconcile")12 },13 }14 e.Reconcile()15}

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3}4type EventReceiver interface {5 Reconcile(e *Event)6}7type EventReceiverFunc func(e *Event)8func (f EventReceiverFunc) Reconcile(e *Event) {9 f(e)10}11func NewEventReceiverFunc(f func(e *Event)) EventReceiver {12 return EventReceiverFunc(f)13}14func main() {15 er := NewEventReceiverFunc(func(e *Event) {16 fmt.Println("Event received", e)17 })18 e := &Event{19 LastModified: time.Now(),20 }21 er.Reconcile(e)22}23Event received &{path/to/file 2021-07-18 13:08:30.7677078 +0530 IST m=+0.000000001 1234}

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3 newValue interface{}4 oldValue interface{}5}6func (e *Event) Reconcile() {7 fmt.Println("Reconcile method of Event class called")8}9func main() {10 event := Event{eventType: "add", key: "key1", newValue: 10, oldValue: 0}11 fmt.Println("event:", event)12 fmt.Println("event type:", reflect.TypeOf(event))13 fmt.Println("event value:", reflect.ValueOf(event))14 fmt.Println("event address:", &event)15 fmt.Println("event method:", reflect.ValueOf(event).MethodByName("Reconcile"))16 reflect.ValueOf(&event).MethodByName("Reconcile").Call([]reflect.Value{})17}18event: {add key1 10 0}19event value: {add key1 10 0}20event method: <func(main.Event) Value>

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 e := NewEvent("New event")4 o := NewObserver("New observer")5 e.Register(o)6 e.Trigger()7}8type Event struct {9 observers map[*Observer]struct{}10}11func NewEvent(name string) *Event {12 return &Event{13 observers: make(map[*Observer]struct{}),14 }15}16func (e *Event) Register(o *Observer) {17 e.observers[o] = struct{}{}18}19func (e *Event) Trigger() {20 for o := range e.observers {21 o.Update()22 }23}24type Observer struct {25}26func NewObserver(name string) *Observer {27 return &Observer{name: name}28}29func (o *Observer) Update() {30 fmt.Printf("Observer %s is notified31}

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3 Data interface{}4}5func (e *Event) Reconcile() {6 fmt.Println("Reconcile method of Event class")7}8type MyEvent struct {9 Data interface{}10}11func (e *MyEvent) Reconcile() {12 fmt.Println("Reconcile method of MyEvent class")13}14func main() {15 e := Event{16 }17 e.Reconcile()18 me := MyEvent{19 }20 me.Reconcile()21 var i interface{}22 if _, ok := i.(Event); ok {23 fmt.Println("MyEvent implements Reconcile method of Event class")24 } else {25 fmt.Println("MyEvent does not implement Reconcile method of Event class")26 }27 if _, ok := i.(MyEvent); ok {28 fmt.Println("MyEvent implements Reconcile method of MyEvent class")29 } else {30 fmt.Println("MyEvent does not implement Reconcile method of MyEvent class")31 }32 if _, ok := i.(interface {33 Reconcile()34 }); ok {35 fmt.Println("MyEvent implements Reconcile method of Event class")36 } else {37 fmt.Println("MyEvent does not implement Reconcile method of Event class")38 }39 if _, ok := i.(interface {

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 event := Event{4 }5 event.Reconcile()6}7import (8func main() {9 event := Event{10 }11 event.Reconcile()12}13import (14func main() {15 event := Event{16 }17 event.Reconcile()18}19import (20func main() {21 event := Event{22 }23 event.Reconcile()24}25import (26func main() {27 event := Event{28 }29 event.Reconcile()30}31import (32func main() {33 event := Event{34 }35 event.Reconcile()36}37import (

Full Screen

Full Screen

Reconcile

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3 Object interface{}4 OldObject interface{}5 NewObject interface{}6}7const (8func (e *Event) Reconcile() error {9 switch e.Type {10 fmt.Println("Object created: ", e.Object)11 fmt.Println("Object updated: ", e.Object)12 fmt.Println("Object deleted: ", e.Object)13 }14}15func main() {16 event := &Event{17 }18 event.Reconcile()19 time.Sleep(2 * time.Second)20}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful