How to use Cleanup method of kafka Package

Best Venom code snippet using kafka.Cleanup

mdb_kafka_structures.go

Source:mdb_kafka_structures.go Github

copy

Full Screen

...7 "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"8 "github.com/yandex-cloud/go-genproto/yandex/cloud/mdb/kafka/v1"9 "github.com/yandex-cloud/terraform-provider-yandex/yandex/internal/hashcode"10)11type TopicCleanupPolicy int3212const (13 Topic_CLEANUP_POLICY_UNSPECIFIED TopicCleanupPolicy = 014 // this policy discards log segments when either their retention time or log size limit is reached. See also: [KafkaConfig2_1.log_retention_ms] and other similar parameters.15 Topic_CLEANUP_POLICY_DELETE TopicCleanupPolicy = 116 // this policy compacts messages in log.17 Topic_CLEANUP_POLICY_COMPACT TopicCleanupPolicy = 218 // this policy use both compaction and deletion for messages and log segments.19 Topic_CLEANUP_POLICY_COMPACT_AND_DELETE TopicCleanupPolicy = 320)21const kafkaConfigPath = "config.0.kafka.0.kafka_config.0"22// Enum value maps for TopicCleanupPolicy.23var (24 Topic_CleanupPolicy_name = map[int32]string{25 0: "CLEANUP_POLICY_UNSPECIFIED",26 1: "CLEANUP_POLICY_DELETE",27 2: "CLEANUP_POLICY_COMPACT",28 3: "CLEANUP_POLICY_COMPACT_AND_DELETE",29 }30 Topic_CleanupPolicy_value = map[string]int32{31 "CLEANUP_POLICY_UNSPECIFIED": 0,32 "CLEANUP_POLICY_DELETE": 1,33 "CLEANUP_POLICY_COMPACT": 2,34 "CLEANUP_POLICY_COMPACT_AND_DELETE": 3,35 }36)37func parseKafkaEnv(e string) (kafka.Cluster_Environment, error) {38 v, ok := kafka.Cluster_Environment_value[e]39 if !ok {40 return 0, fmt.Errorf("value for 'environment' must be one of %s, not `%s`",41 getJoinedKeys(getEnumValueMapKeys(kafka.Cluster_Environment_value)), e)42 }43 return kafka.Cluster_Environment(v), nil44}45func parseKafkaCompression(e string) (kafka.CompressionType, error) {46 v, ok := kafka.CompressionType_value[e]47 if !ok || e == "COMPRESSION_TYPE_UNSPECIFIED" {48 return 0, fmt.Errorf("value for 'compression_type' must be one of %s, not `%s`",49 getJoinedKeys(getEnumValueMapKeysExt(kafka.CompressionType_value, true)), e)50 }51 return kafka.CompressionType(v), nil52}53func parseKafkaPermission(e string) (kafka.Permission_AccessRole, error) {54 v, ok := kafka.Permission_AccessRole_value[e]55 if !ok {56 return 0, fmt.Errorf("value for 'role' must be one of %s, not `%s`",57 getJoinedKeys(getEnumValueMapKeys(kafka.Permission_AccessRole_value)), e)58 }59 return kafka.Permission_AccessRole(v), nil60}61func parseKafkaTopicCleanupPolicy(e string) (TopicCleanupPolicy, error) {62 v, ok := Topic_CleanupPolicy_value[e]63 if !ok || e == "CLEANUP_POLICY_UNSPECIFIED" {64 return 0, fmt.Errorf("value for 'cleanup_policy' must be one of %s, not `%s`",65 getJoinedKeys(getEnumValueMapKeysExt(Topic_CleanupPolicy_value, true)), e)66 }67 return TopicCleanupPolicy(v), nil68}69func parseIntKafkaConfigParam(d *schema.ResourceData, paramName string, retErr *error) *wrappers.Int64Value {70 v, ok := d.GetOk(kafkaConfigPath + "." + paramName)71 if !ok {72 return nil73 }74 i, err := strconv.ParseInt(v.(string), 10, 64)75 if err != nil {76 if *retErr != nil {77 *retErr = err78 }79 return nil80 }81 return &wrappers.Int64Value{Value: i}82}83type KafkaConfig struct {84 CompressionType kafka.CompressionType85 LogFlushIntervalMessages *wrappers.Int64Value86 LogFlushIntervalMs *wrappers.Int64Value87 LogFlushSchedulerIntervalMs *wrappers.Int64Value88 LogRetentionBytes *wrappers.Int64Value89 LogRetentionHours *wrappers.Int64Value90 LogRetentionMinutes *wrappers.Int64Value91 LogRetentionMs *wrappers.Int64Value92 LogSegmentBytes *wrappers.Int64Value93 LogPreallocate *wrappers.BoolValue94 SocketSendBufferBytes *wrappers.Int64Value95 SocketReceiveBufferBytes *wrappers.Int64Value96 AutoCreateTopicsEnable *wrappers.BoolValue97 NumPartitions *wrappers.Int64Value98 DefaultReplicationFactor *wrappers.Int64Value99}100func parseKafkaConfig(d *schema.ResourceData) (*KafkaConfig, error) {101 res := &KafkaConfig{}102 if v, ok := d.GetOk(kafkaConfigPath + ".compression_type"); ok {103 value, err := parseKafkaCompression(v.(string))104 if err != nil {105 return nil, err106 }107 res.CompressionType = value108 }109 var retErr error110 res.LogFlushIntervalMessages = parseIntKafkaConfigParam(d, "log_flush_interval_messages", &retErr)111 res.LogFlushIntervalMs = parseIntKafkaConfigParam(d, "log_flush_interval_ms", &retErr)112 res.LogFlushSchedulerIntervalMs = parseIntKafkaConfigParam(d, "log_flush_scheduler_interval_ms", &retErr)113 res.LogRetentionBytes = parseIntKafkaConfigParam(d, "log_retention_bytes", &retErr)114 res.LogRetentionHours = parseIntKafkaConfigParam(d, "log_retention_hours", &retErr)115 res.LogRetentionMinutes = parseIntKafkaConfigParam(d, "log_retention_minutes", &retErr)116 res.LogRetentionMs = parseIntKafkaConfigParam(d, "log_retention_ms", &retErr)117 res.LogSegmentBytes = parseIntKafkaConfigParam(d, "log_segment_bytes", &retErr)118 res.SocketSendBufferBytes = parseIntKafkaConfigParam(d, "socket_send_buffer_bytes", &retErr)119 res.SocketReceiveBufferBytes = parseIntKafkaConfigParam(d, "socket_receive_buffer_bytes", &retErr)120 res.NumPartitions = parseIntKafkaConfigParam(d, "num_partitions", &retErr)121 res.DefaultReplicationFactor = parseIntKafkaConfigParam(d, "default_replication_factor", &retErr)122 if v, ok := d.GetOk(kafkaConfigPath + ".log_preallocate"); ok {123 res.LogPreallocate = &wrappers.BoolValue{Value: v.(bool)}124 }125 if v, ok := d.GetOk(kafkaConfigPath + ".auto_create_topics_enable"); ok {126 res.AutoCreateTopicsEnable = &wrappers.BoolValue{Value: v.(bool)}127 }128 if retErr != nil {129 return nil, retErr130 }131 return res, nil132}133func expandKafkaConfig2_6(d *schema.ResourceData) (*kafka.KafkaConfig2_6, error) {134 kafkaConfig, err := parseKafkaConfig(d)135 if err != nil {136 return nil, err137 }138 return &kafka.KafkaConfig2_6{139 CompressionType: kafkaConfig.CompressionType,140 LogFlushIntervalMessages: kafkaConfig.LogFlushIntervalMessages,141 LogFlushIntervalMs: kafkaConfig.LogFlushIntervalMs,142 LogFlushSchedulerIntervalMs: kafkaConfig.LogFlushSchedulerIntervalMs,143 LogRetentionBytes: kafkaConfig.LogRetentionBytes,144 LogRetentionHours: kafkaConfig.LogRetentionHours,145 LogRetentionMinutes: kafkaConfig.LogRetentionMinutes,146 LogRetentionMs: kafkaConfig.LogRetentionMs,147 LogSegmentBytes: kafkaConfig.LogSegmentBytes,148 LogPreallocate: kafkaConfig.LogPreallocate,149 SocketSendBufferBytes: kafkaConfig.SocketSendBufferBytes,150 SocketReceiveBufferBytes: kafkaConfig.SocketReceiveBufferBytes,151 AutoCreateTopicsEnable: kafkaConfig.AutoCreateTopicsEnable,152 NumPartitions: kafkaConfig.NumPartitions,153 DefaultReplicationFactor: kafkaConfig.DefaultReplicationFactor,154 }, nil155}156func expandKafkaConfig2_1(d *schema.ResourceData) (*kafka.KafkaConfig2_1, error) {157 kafkaConfig, err := parseKafkaConfig(d)158 if err != nil {159 return nil, err160 }161 return &kafka.KafkaConfig2_1{162 CompressionType: kafkaConfig.CompressionType,163 LogFlushIntervalMessages: kafkaConfig.LogFlushIntervalMessages,164 LogFlushIntervalMs: kafkaConfig.LogFlushIntervalMs,165 LogFlushSchedulerIntervalMs: kafkaConfig.LogFlushSchedulerIntervalMs,166 LogRetentionBytes: kafkaConfig.LogRetentionBytes,167 LogRetentionHours: kafkaConfig.LogRetentionHours,168 LogRetentionMinutes: kafkaConfig.LogRetentionMinutes,169 LogRetentionMs: kafkaConfig.LogRetentionMs,170 LogSegmentBytes: kafkaConfig.LogSegmentBytes,171 LogPreallocate: kafkaConfig.LogPreallocate,172 SocketSendBufferBytes: kafkaConfig.SocketSendBufferBytes,173 SocketReceiveBufferBytes: kafkaConfig.SocketReceiveBufferBytes,174 AutoCreateTopicsEnable: kafkaConfig.AutoCreateTopicsEnable,175 NumPartitions: kafkaConfig.NumPartitions,176 DefaultReplicationFactor: kafkaConfig.DefaultReplicationFactor,177 }, nil178}179func expandKafkaConfig2_8(d *schema.ResourceData) (*kafka.KafkaConfig2_8, error) {180 kafkaConfig, err := parseKafkaConfig(d)181 if err != nil {182 return nil, err183 }184 return &kafka.KafkaConfig2_8{185 CompressionType: kafkaConfig.CompressionType,186 LogFlushIntervalMessages: kafkaConfig.LogFlushIntervalMessages,187 LogFlushIntervalMs: kafkaConfig.LogFlushIntervalMs,188 LogFlushSchedulerIntervalMs: kafkaConfig.LogFlushSchedulerIntervalMs,189 LogRetentionBytes: kafkaConfig.LogRetentionBytes,190 LogRetentionHours: kafkaConfig.LogRetentionHours,191 LogRetentionMinutes: kafkaConfig.LogRetentionMinutes,192 LogRetentionMs: kafkaConfig.LogRetentionMs,193 LogSegmentBytes: kafkaConfig.LogSegmentBytes,194 LogPreallocate: kafkaConfig.LogPreallocate,195 SocketSendBufferBytes: kafkaConfig.SocketSendBufferBytes,196 SocketReceiveBufferBytes: kafkaConfig.SocketReceiveBufferBytes,197 AutoCreateTopicsEnable: kafkaConfig.AutoCreateTopicsEnable,198 NumPartitions: kafkaConfig.NumPartitions,199 DefaultReplicationFactor: kafkaConfig.DefaultReplicationFactor,200 }, nil201}202func expandKafkaConfig3x(d *schema.ResourceData) (*kafka.KafkaConfig3, error) {203 kafkaConfig, err := parseKafkaConfig(d)204 if err != nil {205 return nil, err206 }207 return &kafka.KafkaConfig3{208 CompressionType: kafkaConfig.CompressionType,209 LogFlushIntervalMessages: kafkaConfig.LogFlushIntervalMessages,210 LogFlushIntervalMs: kafkaConfig.LogFlushIntervalMs,211 LogFlushSchedulerIntervalMs: kafkaConfig.LogFlushSchedulerIntervalMs,212 LogRetentionBytes: kafkaConfig.LogRetentionBytes,213 LogRetentionHours: kafkaConfig.LogRetentionHours,214 LogRetentionMinutes: kafkaConfig.LogRetentionMinutes,215 LogRetentionMs: kafkaConfig.LogRetentionMs,216 LogSegmentBytes: kafkaConfig.LogSegmentBytes,217 LogPreallocate: kafkaConfig.LogPreallocate,218 SocketSendBufferBytes: kafkaConfig.SocketSendBufferBytes,219 SocketReceiveBufferBytes: kafkaConfig.SocketReceiveBufferBytes,220 AutoCreateTopicsEnable: kafkaConfig.AutoCreateTopicsEnable,221 NumPartitions: kafkaConfig.NumPartitions,222 DefaultReplicationFactor: kafkaConfig.DefaultReplicationFactor,223 }, nil224}225type TopicConfig struct {226 CleanupPolicy string227 CompressionType kafka.CompressionType228 DeleteRetentionMs *wrappers.Int64Value229 FileDeleteDelayMs *wrappers.Int64Value230 FlushMessages *wrappers.Int64Value231 FlushMs *wrappers.Int64Value232 MinCompactionLagMs *wrappers.Int64Value233 RetentionBytes *wrappers.Int64Value234 RetentionMs *wrappers.Int64Value235 MaxMessageBytes *wrappers.Int64Value236 MinInsyncReplicas *wrappers.Int64Value237 SegmentBytes *wrappers.Int64Value238 Preallocate *wrappers.BoolValue239}240func parseIntTopicConfigParam(d *schema.ResourceData, paramPath string, retErr *error) *wrappers.Int64Value {241 paramValue, ok := d.GetOk(paramPath)242 if !ok {243 return nil244 }245 str := paramValue.(string)246 if str == "" {247 return nil248 }249 i, err := strconv.ParseInt(str, 10, 64)250 if err != nil {251 if *retErr != nil {252 *retErr = err253 }254 return nil255 }256 return &wrappers.Int64Value{Value: i}257}258func parseKafkaTopicConfig(d *schema.ResourceData, topicConfigPrefix string) (*TopicConfig, error) {259 key := func(key string) string {260 return fmt.Sprintf("%s%s", topicConfigPrefix, key)261 }262 res := &TopicConfig{}263 if cleanupPolicy := d.Get(key("cleanup_policy")).(string); cleanupPolicy != "" {264 _, err := parseKafkaTopicCleanupPolicy(cleanupPolicy)265 if err != nil {266 return nil, err267 }268 res.CleanupPolicy = cleanupPolicy269 }270 if compressionType := d.Get(key("compression_type")).(string); compressionType != "" {271 value, err := parseKafkaCompression(compressionType)272 if err != nil {273 return nil, err274 }275 res.CompressionType = value276 }277 var retErr error278 res.DeleteRetentionMs = parseIntTopicConfigParam(d, key("delete_retention_ms"), &retErr)279 res.FileDeleteDelayMs = parseIntTopicConfigParam(d, key("file_delete_delay_ms"), &retErr)280 res.FlushMessages = parseIntTopicConfigParam(d, key("flush_messages"), &retErr)281 res.FlushMs = parseIntTopicConfigParam(d, key("flush_ms"), &retErr)282 res.MinCompactionLagMs = parseIntTopicConfigParam(d, key("min_compaction_lag_ms"), &retErr)283 res.RetentionBytes = parseIntTopicConfigParam(d, key("retention_bytes"), &retErr)284 res.RetentionMs = parseIntTopicConfigParam(d, key("retention_ms"), &retErr)285 res.MaxMessageBytes = parseIntTopicConfigParam(d, key("max_message_bytes"), &retErr)286 res.MinInsyncReplicas = parseIntTopicConfigParam(d, key("min_insync_replicas"), &retErr)287 res.SegmentBytes = parseIntTopicConfigParam(d, key("segment_bytes"), &retErr)288 if preallocateRaw, ok := d.GetOk(key("preallocate")); ok {289 res.Preallocate = &wrappers.BoolValue{Value: preallocateRaw.(bool)}290 }291 if retErr != nil {292 return nil, retErr293 }294 return res, nil295}296func expandKafkaTopicConfig2_6(d *schema.ResourceData, topicConfigPath string) (*kafka.TopicConfig2_6, error) {297 topicConfig, err := parseKafkaTopicConfig(d, topicConfigPath)298 if err != nil {299 return nil, err300 }301 res := &kafka.TopicConfig2_6{302 CleanupPolicy: kafka.TopicConfig2_6_CleanupPolicy(kafka.TopicConfig2_6_CleanupPolicy_value[topicConfig.CleanupPolicy]),303 CompressionType: topicConfig.CompressionType,304 DeleteRetentionMs: topicConfig.DeleteRetentionMs,305 FileDeleteDelayMs: topicConfig.FileDeleteDelayMs,306 FlushMessages: topicConfig.FlushMessages,307 FlushMs: topicConfig.FlushMs,308 MinCompactionLagMs: topicConfig.MinCompactionLagMs,309 RetentionBytes: topicConfig.RetentionBytes,310 RetentionMs: topicConfig.RetentionMs,311 MaxMessageBytes: topicConfig.MaxMessageBytes,312 MinInsyncReplicas: topicConfig.MinInsyncReplicas,313 SegmentBytes: topicConfig.SegmentBytes,314 Preallocate: topicConfig.Preallocate,315 }316 return res, nil317}318func expandKafkaTopicConfig2_1(d *schema.ResourceData, topicConfigPath string) (*kafka.TopicConfig2_1, error) {319 topicConfig, err := parseKafkaTopicConfig(d, topicConfigPath)320 if err != nil {321 return nil, err322 }323 res := &kafka.TopicConfig2_1{324 CleanupPolicy: kafka.TopicConfig2_1_CleanupPolicy(kafka.TopicConfig2_1_CleanupPolicy_value[topicConfig.CleanupPolicy]),325 CompressionType: topicConfig.CompressionType,326 DeleteRetentionMs: topicConfig.DeleteRetentionMs,327 FileDeleteDelayMs: topicConfig.FileDeleteDelayMs,328 FlushMessages: topicConfig.FlushMessages,329 FlushMs: topicConfig.FlushMs,330 MinCompactionLagMs: topicConfig.MinCompactionLagMs,331 RetentionBytes: topicConfig.RetentionBytes,332 RetentionMs: topicConfig.RetentionMs,333 MaxMessageBytes: topicConfig.MaxMessageBytes,334 MinInsyncReplicas: topicConfig.MinInsyncReplicas,335 SegmentBytes: topicConfig.SegmentBytes,336 Preallocate: topicConfig.Preallocate,337 }338 return res, nil339}340func expandKafkaTopicConfig2_8(d *schema.ResourceData, topicConfigPrefix string) (*kafka.TopicConfig2_8, error) {341 topicConfig, err := parseKafkaTopicConfig(d, topicConfigPrefix)342 if err != nil {343 return nil, err344 }345 res := &kafka.TopicConfig2_8{346 CleanupPolicy: kafka.TopicConfig2_8_CleanupPolicy(kafka.TopicConfig2_8_CleanupPolicy_value[topicConfig.CleanupPolicy]),347 CompressionType: topicConfig.CompressionType,348 DeleteRetentionMs: topicConfig.DeleteRetentionMs,349 FileDeleteDelayMs: topicConfig.FileDeleteDelayMs,350 FlushMessages: topicConfig.FlushMessages,351 FlushMs: topicConfig.FlushMs,352 MinCompactionLagMs: topicConfig.MinCompactionLagMs,353 RetentionBytes: topicConfig.RetentionBytes,354 RetentionMs: topicConfig.RetentionMs,355 MaxMessageBytes: topicConfig.MaxMessageBytes,356 MinInsyncReplicas: topicConfig.MinInsyncReplicas,357 SegmentBytes: topicConfig.SegmentBytes,358 Preallocate: topicConfig.Preallocate,359 }360 return res, nil361}362func expandKafkaTopicConfig3x(d *schema.ResourceData, topicConfigPrefix string) (*kafka.TopicConfig3, error) {363 topicConfig, err := parseKafkaTopicConfig(d, topicConfigPrefix)364 if err != nil {365 return nil, err366 }367 res := &kafka.TopicConfig3{368 CleanupPolicy: kafka.TopicConfig3_CleanupPolicy(kafka.TopicConfig3_CleanupPolicy_value[topicConfig.CleanupPolicy]),369 CompressionType: topicConfig.CompressionType,370 DeleteRetentionMs: topicConfig.DeleteRetentionMs,371 FileDeleteDelayMs: topicConfig.FileDeleteDelayMs,372 FlushMessages: topicConfig.FlushMessages,373 FlushMs: topicConfig.FlushMs,374 MinCompactionLagMs: topicConfig.MinCompactionLagMs,375 RetentionBytes: topicConfig.RetentionBytes,376 RetentionMs: topicConfig.RetentionMs,377 MaxMessageBytes: topicConfig.MaxMessageBytes,378 MinInsyncReplicas: topicConfig.MinInsyncReplicas,379 SegmentBytes: topicConfig.SegmentBytes,380 Preallocate: topicConfig.Preallocate,381 }382 return res, nil383}384func expandKafkaConfigSpec(d *schema.ResourceData) (*kafka.ConfigSpec, error) {385 result := &kafka.ConfigSpec{}386 if v, ok := d.GetOk("config.0.version"); ok {387 result.Version = v.(string)388 }389 if v, ok := d.GetOk("config.0.brokers_count"); ok {390 result.BrokersCount = &wrappers.Int64Value{Value: int64(v.(int))}391 }392 if v, ok := d.GetOk("config.0.assign_public_ip"); ok {393 result.AssignPublicIp = v.(bool)394 }395 if v, ok := d.GetOk("config.0.unmanaged_topics"); ok {396 result.UnmanagedTopics = v.(bool)397 }398 if v, ok := d.GetOk("config.0.schema_registry"); ok {399 result.SchemaRegistry = v.(bool)400 }401 if v, ok := d.GetOk("config.0.zones"); ok {402 zones := v.([]interface{})403 result.ZoneId = []string{}404 for _, zone := range zones {405 result.ZoneId = append(result.ZoneId, zone.(string))406 }407 }408 result.Kafka = &kafka.ConfigSpec_Kafka{}409 result.Kafka.Resources = expandKafkaResources(d, "config.0.kafka.0.resources.0")410 switch version := result.Version; version {411 case "3.0", "3.1", "3.2":412 cfg, err := expandKafkaConfig3x(d)413 if err != nil {414 return nil, err415 }416 result.Kafka.SetKafkaConfig_3(cfg)417 case "2.8":418 cfg, err := expandKafkaConfig2_8(d)419 if err != nil {420 return nil, err421 }422 result.Kafka.SetKafkaConfig_2_8(cfg)423 case "2.6":424 cfg, err := expandKafkaConfig2_6(d)425 if err != nil {426 return nil, err427 }428 result.Kafka.SetKafkaConfig_2_6(cfg)429 case "2.1":430 cfg, err := expandKafkaConfig2_1(d)431 if err != nil {432 return nil, err433 }434 result.Kafka.SetKafkaConfig_2_1(cfg)435 default:436 return nil, fmt.Errorf("you must specify version of Kafka")437 }438 if _, ok := d.GetOk("config.0.zookeeper"); ok {439 result.Zookeeper = &kafka.ConfigSpec_Zookeeper{}440 result.Zookeeper.Resources = expandKafkaResources(d, "config.0.zookeeper.0.resources.0")441 }442 result.SetAccess(expandKafkaAccess(d))443 return result, nil444}445func expandKafkaTopics(d *schema.ResourceData) ([]*kafka.TopicSpec, error) {446 var result []*kafka.TopicSpec447 version, ok := d.GetOk("config.0.version")448 if !ok {449 return nil, fmt.Errorf("you must specify version of Kafka")450 }451 topics := d.Get("topic").([]interface{})452 for idx := range topics {453 topicSpec, err := buildKafkaTopicSpec(d, fmt.Sprintf("topic.%d.", idx), version.(string))454 if err != nil {455 return nil, err456 }457 result = append(result, topicSpec)458 }459 return result, nil460}461func expandKafkaUsers(d *schema.ResourceData) ([]*kafka.UserSpec, error) {462 users := d.Get("user").(*schema.Set)463 result := make([]*kafka.UserSpec, 0, users.Len())464 for _, u := range users.List() {465 user, err := expandKafkaUser(u)466 if err != nil {467 return nil, err468 }469 result = append(result, user)470 }471 return result, nil472}473func expandKafkaUser(u interface{}) (*kafka.UserSpec, error) {474 m := u.(map[string]interface{})475 user := &kafka.UserSpec{}476 if v, ok := m["name"]; ok {477 user.Name = v.(string)478 }479 if v, ok := m["password"]; ok {480 user.Password = v.(string)481 }482 if v, ok := m["permission"]; ok {483 permissions, err := expandKafkaPermissions(v.(*schema.Set))484 if err != nil {485 return nil, err486 }487 user.Permissions = permissions488 }489 return user, nil490}491func expandKafkaPermissions(ps *schema.Set) ([]*kafka.Permission, error) {492 result := []*kafka.Permission{}493 for _, p := range ps.List() {494 m := p.(map[string]interface{})495 permission := &kafka.Permission{}496 if v, ok := m["topic_name"]; ok {497 permission.TopicName = v.(string)498 }499 if v, ok := m["role"]; ok {500 role, err := parseKafkaPermission(v.(string))501 if err != nil {502 return nil, err503 }504 permission.Role = role505 }506 result = append(result, permission)507 }508 return result, nil509}510func flattenKafkaConfig(cluster *kafka.Cluster) ([]map[string]interface{}, error) {511 kafkaResources, err := flattenKafkaResources(cluster.Config.Kafka.Resources)512 if err != nil {513 return nil, err514 }515 var kafkaConfig map[string]interface{}516 if cluster.Config.Kafka.GetKafkaConfig_2_6() != nil {517 kafkaConfig, err = flattenKafkaConfig2_6Settings(cluster.Config.Kafka.GetKafkaConfig_2_6())518 if err != nil {519 return nil, err520 }521 }522 if cluster.Config.Kafka.GetKafkaConfig_2_1() != nil {523 kafkaConfig, err = flattenKafkaConfig2_1Settings(cluster.Config.Kafka.GetKafkaConfig_2_1())524 if err != nil {525 return nil, err526 }527 }528 if cluster.Config.Kafka.GetKafkaConfig_2_8() != nil {529 kafkaConfig, err = flattenKafkaConfig2_8Settings(cluster.Config.Kafka.GetKafkaConfig_2_8())530 if err != nil {531 return nil, err532 }533 }534 if cluster.Config.Kafka.GetKafkaConfig_3() != nil {535 kafkaConfig, err = flattenKafkaConfig3Settings(cluster.Config.Kafka.GetKafkaConfig_3())536 if err != nil {537 return nil, err538 }539 }540 config := map[string]interface{}{541 "brokers_count": cluster.Config.BrokersCount.GetValue(),542 "assign_public_ip": cluster.Config.AssignPublicIp,543 "unmanaged_topics": cluster.Config.UnmanagedTopics,544 "schema_registry": cluster.Config.SchemaRegistry,545 "zones": cluster.Config.ZoneId,546 "version": cluster.Config.Version,547 "kafka": []map[string]interface{}{548 {549 "resources": []map[string]interface{}{kafkaResources},550 "kafka_config": []map[string]interface{}{kafkaConfig},551 },552 },553 }554 if cluster.Config.Zookeeper != nil {555 zkResources, err := flattenKafkaResources(cluster.Config.Zookeeper.Resources)556 if err != nil {557 return nil, err558 }559 config["zookeeper"] = []map[string]interface{}{560 {561 "resources": []map[string]interface{}{zkResources},562 },563 }564 }565 if cluster.Config.GetAccess() != nil {566 config["access"] = flattenKafkaAccess(cluster.Config)567 }568 return []map[string]interface{}{config}, nil569}570type KafkaConfigSettings interface {571 GetCompressionType() kafka.CompressionType572 GetLogFlushIntervalMessages() *wrappers.Int64Value573 GetLogFlushIntervalMs() *wrappers.Int64Value574 GetLogFlushSchedulerIntervalMs() *wrappers.Int64Value575 GetLogRetentionBytes() *wrappers.Int64Value576 GetLogRetentionHours() *wrappers.Int64Value577 GetLogRetentionMinutes() *wrappers.Int64Value578 GetLogRetentionMs() *wrappers.Int64Value579 GetLogSegmentBytes() *wrappers.Int64Value580 GetLogPreallocate() *wrappers.BoolValue581 GetSocketSendBufferBytes() *wrappers.Int64Value582 GetSocketReceiveBufferBytes() *wrappers.Int64Value583 GetAutoCreateTopicsEnable() *wrappers.BoolValue584 GetNumPartitions() *wrappers.Int64Value585 GetDefaultReplicationFactor() *wrappers.Int64Value586}587func flattenKafkaConfigSettings(kafkaConfig KafkaConfigSettings) (map[string]interface{}, error) {588 res := map[string]interface{}{}589 if kafkaConfig.GetCompressionType() != kafka.CompressionType_COMPRESSION_TYPE_UNSPECIFIED {590 res["compression_type"] = kafkaConfig.GetCompressionType().String()591 }592 if kafkaConfig.GetLogFlushIntervalMessages() != nil {593 res["log_flush_interval_messages"] = strconv.FormatInt(kafkaConfig.GetLogFlushIntervalMessages().GetValue(), 10)594 }595 if kafkaConfig.GetLogFlushIntervalMs() != nil {596 res["log_flush_interval_ms"] = strconv.FormatInt(kafkaConfig.GetLogFlushIntervalMs().GetValue(), 10)597 }598 if kafkaConfig.GetLogFlushSchedulerIntervalMs() != nil {599 res["log_flush_scheduler_interval_ms"] = strconv.FormatInt(kafkaConfig.GetLogFlushSchedulerIntervalMs().GetValue(), 10)600 }601 if kafkaConfig.GetLogRetentionBytes() != nil {602 res["log_retention_bytes"] = strconv.FormatInt(kafkaConfig.GetLogRetentionBytes().GetValue(), 10)603 }604 if kafkaConfig.GetLogRetentionHours() != nil {605 res["log_retention_hours"] = strconv.FormatInt(kafkaConfig.GetLogRetentionHours().GetValue(), 10)606 }607 if kafkaConfig.GetLogRetentionMinutes() != nil {608 res["log_retention_minutes"] = strconv.FormatInt(kafkaConfig.GetLogRetentionMinutes().GetValue(), 10)609 }610 if kafkaConfig.GetLogRetentionMs() != nil {611 res["log_retention_ms"] = strconv.FormatInt(kafkaConfig.GetLogRetentionMs().GetValue(), 10)612 }613 if kafkaConfig.GetLogSegmentBytes() != nil {614 res["log_segment_bytes"] = strconv.FormatInt(kafkaConfig.GetLogSegmentBytes().GetValue(), 10)615 }616 if kafkaConfig.GetLogPreallocate() != nil {617 res["log_preallocate"] = kafkaConfig.GetLogPreallocate().GetValue()618 }619 if kafkaConfig.GetSocketSendBufferBytes() != nil {620 res["socket_send_buffer_bytes"] = strconv.FormatInt(kafkaConfig.GetSocketSendBufferBytes().GetValue(), 10)621 }622 if kafkaConfig.GetSocketReceiveBufferBytes() != nil {623 res["socket_receive_buffer_bytes"] = strconv.FormatInt(kafkaConfig.GetSocketReceiveBufferBytes().GetValue(), 10)624 }625 if kafkaConfig.GetAutoCreateTopicsEnable() != nil {626 res["auto_create_topics_enable"] = kafkaConfig.GetAutoCreateTopicsEnable().GetValue()627 }628 if kafkaConfig.GetNumPartitions() != nil {629 res["num_partitions"] = strconv.FormatInt(kafkaConfig.GetNumPartitions().GetValue(), 10)630 }631 if kafkaConfig.GetDefaultReplicationFactor() != nil {632 res["default_replication_factor"] = strconv.FormatInt(kafkaConfig.GetDefaultReplicationFactor().GetValue(), 10)633 }634 return res, nil635}636func flattenKafkaConfig2_6Settings(r *kafka.KafkaConfig2_6) (map[string]interface{}, error) {637 return flattenKafkaConfigSettings(r)638}639func flattenKafkaConfig2_1Settings(r *kafka.KafkaConfig2_1) (map[string]interface{}, error) {640 return flattenKafkaConfigSettings(r)641}642func flattenKafkaConfig2_8Settings(r *kafka.KafkaConfig2_8) (map[string]interface{}, error) {643 return flattenKafkaConfigSettings(r)644}645func flattenKafkaConfig3Settings(r *kafka.KafkaConfig3) (map[string]interface{}, error) {646 return flattenKafkaConfigSettings(r)647}648func flattenKafkaResources(r *kafka.Resources) (map[string]interface{}, error) {649 res := map[string]interface{}{}650 res["resource_preset_id"] = r.ResourcePresetId651 res["disk_type_id"] = r.DiskTypeId652 res["disk_size"] = toGigabytes(r.DiskSize)653 return res, nil654}655func expandKafkaResources(d *schema.ResourceData, rootKey string) *kafka.Resources {656 resources := &kafka.Resources{}657 if v, ok := d.GetOk(rootKey + ".resource_preset_id"); ok {658 resources.ResourcePresetId = v.(string)659 }660 if v, ok := d.GetOk(rootKey + ".disk_size"); ok {661 resources.DiskSize = toBytes(v.(int))662 }663 if v, ok := d.GetOk(rootKey + ".disk_type_id"); ok {664 resources.DiskTypeId = v.(string)665 }666 return resources667}668func kafkaUserHash(v interface{}) int {669 var buf bytes.Buffer670 m := v.(map[string]interface{})671 if n, ok := m["name"]; ok {672 buf.WriteString(fmt.Sprintf("%s-", n.(string)))673 }674 if p, ok := m["password"]; ok {675 buf.WriteString(fmt.Sprintf("%s-", p.(string)))676 }677 if ps, ok := m["permission"]; ok {678 buf.WriteString(fmt.Sprintf("%v-", ps.(*schema.Set).List()))679 }680 return hashcode.String(buf.String())681}682func kafkaUserPermissionHash(v interface{}) int {683 var buf bytes.Buffer684 m := v.(map[string]interface{})685 if n, ok := m["topic_name"]; ok {686 buf.WriteString(fmt.Sprintf("%s-", n.(string)))687 }688 if r, ok := m["role"]; ok {689 buf.WriteString(fmt.Sprintf("%v-", r))690 }691 return hashcode.String(buf.String())692}693func kafkaHostHash(v interface{}) int {694 var buf bytes.Buffer695 m := v.(map[string]interface{})696 if n, ok := m["name"]; ok {697 buf.WriteString(fmt.Sprintf("%s-", n.(string)))698 }699 return hashcode.String(buf.String())700}701func flattenKafkaTopics(topics []*kafka.Topic) []map[string]interface{} {702 result := make([]map[string]interface{}, 0)703 for _, d := range topics {704 m := make(map[string]interface{})705 m["name"] = d.GetName()706 m["partitions"] = d.GetPartitions().GetValue()707 m["replication_factor"] = d.GetReplicationFactor().GetValue()708 var cfg map[string]interface{}709 if d.GetTopicConfig_2_6() != nil {710 cfg = flattenKafkaTopicConfig2_6(d.GetTopicConfig_2_6())711 }712 if d.GetTopicConfig_2_1() != nil {713 cfg = flattenKafkaTopicConfig2_1(d.GetTopicConfig_2_1())714 }715 if d.GetTopicConfig_2_8() != nil {716 cfg = flattenKafkaTopicConfig2_8(d.GetTopicConfig_2_8())717 }718 if d.GetTopicConfig_3() != nil {719 cfg = flattenKafkaTopicConfig3(d.GetTopicConfig_3())720 }721 if len(cfg) != 0 {722 m["topic_config"] = []map[string]interface{}{cfg}723 }724 result = append(result, m)725 }726 return result727}728type TopicConfigSpec interface {729 GetCompressionType() kafka.CompressionType730 GetDeleteRetentionMs() *wrappers.Int64Value731 GetFileDeleteDelayMs() *wrappers.Int64Value732 GetFlushMessages() *wrappers.Int64Value733 GetFlushMs() *wrappers.Int64Value734 GetMinCompactionLagMs() *wrappers.Int64Value735 GetRetentionBytes() *wrappers.Int64Value736 GetRetentionMs() *wrappers.Int64Value737 GetMaxMessageBytes() *wrappers.Int64Value738 GetMinInsyncReplicas() *wrappers.Int64Value739 GetSegmentBytes() *wrappers.Int64Value740 GetPreallocate() *wrappers.BoolValue741}742func flattenKafkaTopicConfig(topicConfig TopicConfigSpec) map[string]interface{} {743 result := make(map[string]interface{})744 if topicConfig.GetCompressionType() != kafka.CompressionType_COMPRESSION_TYPE_UNSPECIFIED {745 result["compression_type"] = topicConfig.GetCompressionType().String()746 }747 if topicConfig.GetDeleteRetentionMs() != nil {748 result["delete_retention_ms"] = strconv.FormatInt(topicConfig.GetDeleteRetentionMs().GetValue(), 10)749 }750 if topicConfig.GetFileDeleteDelayMs() != nil {751 result["file_delete_delay_ms"] = strconv.FormatInt(topicConfig.GetFileDeleteDelayMs().GetValue(), 10)752 }753 if topicConfig.GetFlushMessages() != nil {754 result["flush_messages"] = strconv.FormatInt(topicConfig.GetFlushMessages().GetValue(), 10)755 }756 if topicConfig.GetFlushMs() != nil {757 result["flush_ms"] = strconv.FormatInt(topicConfig.GetFlushMs().GetValue(), 10)758 }759 if topicConfig.GetMinCompactionLagMs() != nil {760 result["min_compaction_lag_ms"] = strconv.FormatInt(topicConfig.GetMinCompactionLagMs().GetValue(), 10)761 }762 if topicConfig.GetRetentionBytes() != nil {763 result["retention_bytes"] = strconv.FormatInt(topicConfig.GetRetentionBytes().GetValue(), 10)764 }765 if topicConfig.GetRetentionMs() != nil {766 result["retention_ms"] = strconv.FormatInt(topicConfig.GetRetentionMs().GetValue(), 10)767 }768 if topicConfig.GetMaxMessageBytes() != nil {769 result["max_message_bytes"] = strconv.FormatInt(topicConfig.GetMaxMessageBytes().GetValue(), 10)770 }771 if topicConfig.GetMinInsyncReplicas() != nil {772 result["min_insync_replicas"] = strconv.FormatInt(topicConfig.GetMinInsyncReplicas().GetValue(), 10)773 }774 if topicConfig.GetSegmentBytes() != nil {775 result["segment_bytes"] = strconv.FormatInt(topicConfig.GetSegmentBytes().GetValue(), 10)776 }777 if topicConfig.GetPreallocate() != nil {778 result["preallocate"] = topicConfig.GetPreallocate().GetValue()779 }780 return result781}782func flattenKafkaTopicConfig2_6(topicConfig *kafka.TopicConfig2_6) map[string]interface{} {783 result := flattenKafkaTopicConfig(topicConfig)784 if topicConfig.GetCleanupPolicy() != kafka.TopicConfig2_6_CLEANUP_POLICY_UNSPECIFIED {785 result["cleanup_policy"] = topicConfig.GetCleanupPolicy().String()786 }787 return result788}789func flattenKafkaTopicConfig2_1(topicConfig *kafka.TopicConfig2_1) map[string]interface{} {790 result := flattenKafkaTopicConfig(topicConfig)791 if topicConfig.GetCleanupPolicy() != kafka.TopicConfig2_1_CLEANUP_POLICY_UNSPECIFIED {792 result["cleanup_policy"] = topicConfig.GetCleanupPolicy().String()793 }794 return result795}796func flattenKafkaTopicConfig2_8(topicConfig *kafka.TopicConfig2_8) map[string]interface{} {797 result := flattenKafkaTopicConfig(topicConfig)798 if topicConfig.GetCleanupPolicy() != kafka.TopicConfig2_8_CLEANUP_POLICY_UNSPECIFIED {799 result["cleanup_policy"] = topicConfig.GetCleanupPolicy().String()800 }801 return result802}803func flattenKafkaTopicConfig3(topicConfig *kafka.TopicConfig3) map[string]interface{} {804 result := flattenKafkaTopicConfig(topicConfig)805 if topicConfig.GetCleanupPolicy() != kafka.TopicConfig3_CLEANUP_POLICY_UNSPECIFIED {806 result["cleanup_policy"] = topicConfig.GetCleanupPolicy().String()807 }808 return result809}810func flattenKafkaUsers(users []*kafka.User, passwords map[string]string) *schema.Set {811 result := schema.NewSet(kafkaUserHash, nil)812 for _, user := range users {813 u := map[string]interface{}{}814 u["name"] = user.Name815 perms := schema.NewSet(kafkaUserPermissionHash, nil)816 for _, perm := range user.Permissions {817 p := map[string]interface{}{}818 p["topic_name"] = perm.TopicName819 p["role"] = perm.Role.String()820 perms.Add(p)...

Full Screen

Full Screen

upstash_kafka_topic_test.go

Source:upstash_kafka_topic_test.go Github

copy

Full Screen

...21 kafka_topic_partitions = envVars.KafkaTopicPartitions22 kafka_topic_retention_time = envVars.KafkaTopicRetentionTime23 kafka_topic_retention_size = envVars.KafkaTopicRetentionSize24 kafka_topic_max_message_size = envVars.KafkaTopicMaxMessageSize25 kafka_topic_cleanup_policy = envVars.KafkaTopicCleanupPolicy26 terraformOptions := kafkaTopicOptions(t)27 defer terraform.Destroy(t, terraformOptions)28 terraform.Apply(t, terraformOptions)29 terraform.Plan(t, terraformOptions)30 kafkaTopicAsserter(t, terraformOptions)31 UpstashKafkaTopicRecreate(t)32 UpstashKafkaTopicUpdate(t)33}34func UpstashKafkaTopicRecreate(t *testing.T) {35 kafka_topic_name = kafka_topic_name + "Updated"36 kafka_topic_partitions = kafka_topic_partitions + 137 kafka_topic_retention_time = kafka_topic_retention_time * 1538 kafka_topic_retention_size = kafka_topic_retention_size * 1539 // kafka_topic_max_message_size = kafka_topic_max_message_size...

Full Screen

Full Screen

broker.go

Source:broker.go Github

copy

Full Screen

...78 <-kafka.consumer.ready79 time.Sleep(1 * time.Second)80 channelReady <- true81}82func (kafka *broker) waitUntilCleanupIsReady() (err error) {83 err = <-kafka.cleanupReady84 close(kafka.cleanupReady)85 return err86}87func (kafka *broker) waitForCloseSignal(closeCallback common.CloseCallback) {88 var err error89 sigterm := make(chan os.Signal, 1)90 signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)91 select {92 case <-kafka.ctx.Done():93 err = errors.New("terminating: context cancelled")94 case <-sigterm:95 kafka.cancel()96 err = errors.New("terminating: signal notified")97 }98 cErr := kafka.waitUntilCleanupIsReady()99 if cErr != nil {100 err = fmt.Errorf("%s: %s", err.Error(), cErr.Error())101 }102 kafka.cleanup()103 closeCallback(kafka.ctx, err)104}105func (kafka *broker) cleanup() {106 _ = kafka.producer.Close()107 _ = kafka.consumerGroup.Close()108}...

Full Screen

Full Screen

Cleanup

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 config := sarama.NewConfig()4 client, err := sarama.NewClient([]string{"localhost:9092"}, config)5 if err != nil {6 log.Panic(err)7 }8 defer client.Close()9 consumer, err := sarama.NewConsumerFromClient(client)10 if err != nil {11 log.Panic(err)12 }13 defer consumer.Close()14 partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)15 if err != nil {16 log.Panic(err)17 }18 defer partitionConsumer.Close()19 for msg := range partitionConsumer.Messages() {20 log.Println(string(msg.Value))21 }22 for err := range partitionConsumer.Errors() {23 log.Println(err)24 }25}

Full Screen

Full Screen

Cleanup

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 config := sarama.NewConfig()4 master, err := sarama.NewConsumer([]string{"localhost:9092"}, config)5 if err != nil {6 panic(err)7 }8 defer func() {9 if err := master.Close(); err != nil {10 panic(err)11 }12 }()13 consumer, err := master.ConsumePartition("test", 0, sarama.OffsetNewest)14 if err != nil {15 panic(err)16 }17 defer func() {18 if err := consumer.Close(); err != nil {19 panic(err)20 }21 }()22 signals := make(chan os.Signal, 1)23 signal.Notify(signals, os.Interrupt, syscall.SIGTERM)24 consumer.AsyncClose()25 for {26 select {27 case err := <-consumer.Errors():28 log.Println(err)29 case msg := <-consumer.Messages():30 fmt.Printf("Message claimed: value = %s, offset = %d, topic = %s31", string(msg.Value), msg.Offset, msg.Topic)32 }33 }34}

Full Screen

Full Screen

Cleanup

Using AI Code Generation

copy

Full Screen

1func main() {2 kafka := &Kafka{}3 kafka.Cleanup()4}5func main() {6 kafka := &Kafka{}7 kafka.Cleanup()8}9import (10func main() {11 dat, err := ioutil.ReadFile("file.txt")12 if err != nil {13 panic(err)14 }15 fmt.Print(string(dat))16}17main.main()18runtime.goexit()19I am using Go 1.4.2 on Ubuntu 14.04. I have tried to run the program from the terminal and from within my IDE (GoLand). I have tried to run the program from the directory in which the file is located and from a different directory. I have also tried to run the program with the full path to the file. I have tried to run the program with and without the .txt extension. I have tried to run the program with and without the quotation marks. I have tried to run the program with and without the parentheses. I have tried to run the program with and without the semicolon. I have tried to run the program with and without the period. I have tried to run the program with and without the comma. I have tried to run the program with and without the space. I have tried to run the program with and without the colon. I have tried to run the program with and without the forward slash. I have tried to run the program with and without the backslash. I have tried to run the program with

Full Screen

Full Screen

Cleanup

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)4 if err != nil {5 log.Fatal(err)6 }7 partitions, err := consumer.Partitions("test")8 if err != nil {9 log.Fatal(err)10 }11 for _, partition := range partitions {12 partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest)13 if err != nil {14 log.Fatal(err)15 }16 for msg := range partitionConsumer.Messages() {17 fmt.Println(string(msg.Key), string(msg.Value))18 }19 partitionConsumer.Close()20 }21 consumer.Close()22}23import (24func main() {25 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)26 if err != nil {27 log.Fatal(err)28 }29 partitions, err := consumer.Partitions("test")30 if err != nil {31 log.Fatal(err)32 }33 for _, partition := range partitions {34 partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest)35 if err != nil {36 log.Fatal(err)37 }38 for msg := range partitionConsumer.Messages() {39 fmt.Println(string(msg.Key), string(msg.Value))40 }41 partitionConsumer.Close()42 }43 consumer.Close()44}45import (46func main() {47 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)48 if err != nil {49 log.Fatal(err)50 }

Full Screen

Full Screen

Cleanup

Using AI Code Generation

copy

Full Screen

1func main() {2 producer, err := kafka.NewProducer(&kafka.ConfigMap{3 })4 if err != nil {5 panic(err)6 }7 consumer, err := kafka.NewConsumer(&kafka.ConfigMap{8 })9 if err != nil {10 panic(err)11 }12 defer producer.Close()13 defer consumer.Close()14 pchan := make(chan kafka.Event)15 for i := 0; i < 10; i++ {16 producer.Produce(&kafka.Message{17 TopicPartition: kafka.TopicPartition{18 },19 Value: []byte(fmt.Sprintf("Message #%d", i)),20 }, pchan)21 m := e.(*kafka.Message)22 if m.TopicPartition.Error != nil {23 fmt.Printf("Delivery failed: %v24 } else {25 fmt.Printf("Delivered message to topic %s [%d] at offset %v26 }27 }28 consumer.SubscribeTopics([]string{topic}, nil)29 for {30 msg, err := consumer.ReadMessage(-1)31 if err == nil {32 fmt.Printf("Message on %s: %s33", msg.TopicPartition, string(msg.Value))34 } else {35 fmt.Printf("Consumer error: %v (%v)36 }37 }38}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful