Best Cerberus-source code snippet using org.cerberus.service.kafka.impl.KafkaService.getKafkaConsumerKey
Source:KafkaService.java
...83 @Autowired84 IJsonService jsonService;85 protected final Logger LOG = org.apache.logging.log4j.LogManager.getLogger(getClass());86 @Override87 public String getKafkaConsumerKey(String topic, String bootstrapServers) {88 return topic + "|" + bootstrapServers;89 }90 @Override91 public AnswerItem<AppService> produceEvent(String topic, String key, String eventMessage,92 String bootstrapServers,93 List<AppServiceHeader> serviceHeader) throws InterruptedException, ExecutionException {94 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);;95 AnswerItem<AppService> result = new AnswerItem<>();96 AppService serviceREST = factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKAPRODUCE, "", "", "", "", "", "", "", "", "", "", "",97 "", null, "", null, null);98 Properties props = new Properties();99 serviceHeader.add(factoryAppServiceHeader.create(null, "bootstrap.servers", bootstrapServers, "Y", 0, "", "", null, "", null));100 serviceHeader.add(factoryAppServiceHeader.create(null, "enable.idempotence", "true", "Y", 0, "", "", null, "", null));101 serviceHeader.add(factoryAppServiceHeader.create(null, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));102 serviceHeader.add(factoryAppServiceHeader.create(null, "value.serializer", "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));103 for (AppServiceHeader object : serviceHeader) {104 if (StringUtil.parseBoolean(object.getActive())) {105 props.put(object.getKey(), object.getValue());106 }107 }108 serviceREST.setServicePath(bootstrapServers);109 serviceREST.setKafkaTopic(topic);110 serviceREST.setKafkaKey(key);111 serviceREST.setServiceRequest(eventMessage);112 serviceREST.setHeaderList(serviceHeader);113 KafkaProducer<String, String> producer = new KafkaProducer<>(props);114 int partition = -1;115 long offset = -1;116 try {117 ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, eventMessage);118 LOG.debug("Producing Kafka message - topic : " + topic + " key : " + key + " message : " + eventMessage);119 RecordMetadata metadata = producer.send(record).get(); //Wait for a responses120 partition = metadata.partition();121 offset = metadata.offset();122 LOG.debug("Produced Kafka message - topic : " + topic + " key : " + key + " partition : " + partition + " offset : " + offset);123 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_PRODUCEKAFKA);124 } catch (Exception ex) {125 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);126 message.setDescription(message.getDescription().replace("%EX%", ex.toString()));127 LOG.debug(ex, ex);128 } finally {129 producer.flush();130 producer.close();131 LOG.info("Closed producer");132 }133 serviceREST.setKafkaResponseOffset(offset);134 serviceREST.setKafkaResponsePartition(partition);135 serviceREST.setResponseHTTPBodyContentType(appServiceService.guessContentType(serviceREST, AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON));136 result.setItem(serviceREST);137 message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKAPRODUCE));138 message.setDescription(message.getDescription().replace("%TOPIC%", topic));139 message.setDescription(message.getDescription().replace("%PART%", String.valueOf(partition)));140 message.setDescription(message.getDescription().replace("%OFFSET%", String.valueOf(offset)));141 result.setResultMessage(message);142 return result;143 }144 @SuppressWarnings("unchecked")145 @Override146 public AnswerItem<Map<TopicPartition, Long>> seekEvent(String topic, String bootstrapServers,147 List<AppServiceHeader> serviceHeader) throws InterruptedException, ExecutionException {148 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA);149 AnswerItem<Map<TopicPartition, Long>> result = new AnswerItem<>();150 KafkaConsumer consumer = null;151 try {152 Properties props = new Properties();153 serviceHeader.add(factoryAppServiceHeader.create(null, "bootstrap.servers", bootstrapServers, "Y", 0, "", "", null, "", null));154 serviceHeader.add(factoryAppServiceHeader.create(null, "enable.auto.commit", "false", "Y", 0, "", "", null, "", null));155 serviceHeader.add(factoryAppServiceHeader.create(null, "max.poll.records", "10", "Y", 0, "", "", null, "", null));156 serviceHeader.add(factoryAppServiceHeader.create(null, "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));157 serviceHeader.add(factoryAppServiceHeader.create(null, "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));158 for (AppServiceHeader object : serviceHeader) {159 if (StringUtil.parseBoolean(object.getActive())) {160 props.put(object.getKey(), object.getValue());161 }162 }163 LOG.info("Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));164 consumer = new KafkaConsumer<>(props);165 //Get a list of the topics' partitions166 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);167 List<TopicPartition> topicPartitionList = partitionList.stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());168 //Assign all the partitions to this consumer169 consumer.assign(topicPartitionList);170 consumer.seekToEnd(topicPartitionList); //default to latest offset for all partitions171 HashMap<TopicPartition, Long> valueResult = new HashMap<>();172 Map<TopicPartition, Long> partitionOffset = consumer.endOffsets(topicPartitionList);173 result.setItem(partitionOffset);174 } catch (Exception ex) {175 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);176 message.setDescription(message.getDescription().replace("%EX%", ex.toString()).replace("%TOPIC%", topic));177 LOG.debug(ex, ex);178 } finally {179 consumer.close();180 LOG.info("Closed Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));181 }182 result.setResultMessage(message);183 return result;184 }185 @SuppressWarnings("unchecked")186 @Override187 public AnswerItem<String> searchEvent(Map<TopicPartition, Long> mapOffsetPosition, String topic, String bootstrapServers,188 List<AppServiceHeader> serviceHeader, String filterPath, String filterValue, int targetNbEventsInt, int targetNbSecInt) throws InterruptedException, ExecutionException {189 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);190 AnswerItem<String> result = new AnswerItem<>();191 AppService serviceREST = factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKASEARCH, "", "", "", "", "", "", "", "", "", "", "", "", null, "", null, null);192 Instant date1 = Instant.now();193 JSONArray resultJSON = new JSONArray();194 KafkaConsumer consumer = null;195 int nbFound = 0;196 int nbEvents = 0;197 try {198 Properties props = new Properties();199 serviceHeader.add(factoryAppServiceHeader.create(null, "bootstrap.servers", bootstrapServers, "Y", 0, "", "", null, "", null));200 serviceHeader.add(factoryAppServiceHeader.create(null, "enable.auto.commit", "false", "Y", 0, "", "", null, "", null));201 serviceHeader.add(factoryAppServiceHeader.create(null, "max.poll.records", "10", "Y", 0, "", "", null, "", null));202 serviceHeader.add(factoryAppServiceHeader.create(null, "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));203 serviceHeader.add(factoryAppServiceHeader.create(null, "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));204 for (AppServiceHeader object : serviceHeader) {205 if (StringUtil.parseBoolean(object.getActive())) {206 props.put(object.getKey(), object.getValue());207 }208 }209 LOG.info("Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));210 consumer = new KafkaConsumer<>(props);211 //Get a list of the topics' partitions212 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);213 List<TopicPartition> topicPartitionList = partitionList.stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());214 //Assign all the partitions to this consumer215 consumer.assign(topicPartitionList);216 // Setting each partition to correct Offset.217 for (Map.Entry<TopicPartition, Long> entry : mapOffsetPosition.entrySet()) {218 consumer.seek(entry.getKey(), entry.getValue());219 LOG.debug("Partition : " + entry.getKey().partition() + " set to offset : " + entry.getValue());220 }221 boolean consume = true;222 long timeoutTime = Instant.now().plusSeconds(targetNbSecInt).toEpochMilli(); //default to 30 seconds223 int pollDurationSec = 5;224 if (targetNbSecInt < pollDurationSec) {225 pollDurationSec = targetNbSecInt;226 }227 while (consume) {228 LOG.debug("Start Poll.");229 @SuppressWarnings("unchecked")230 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollDurationSec));231 LOG.debug("End Poll.");232 if (Instant.now().toEpochMilli() > timeoutTime) {233 LOG.debug("Timed out searching for record");234 consumer.wakeup(); //exit235 }236 //Now for each record in the batch of records we got from Kafka237 for (ConsumerRecord<String, String> record : records) {238 try {239 LOG.debug("New record " + record.topic() + " " + record.partition() + " " + record.offset());240 LOG.debug(" " + record.key() + " | " + record.value());241 JSONObject recordJSON = new JSONObject(record.value());242 nbEvents++;243 boolean match = true;244 if (!StringUtil.isNullOrEmpty(filterPath)) {245 String recordJSONfiltered = "";246 try {247 recordJSONfiltered = jsonService.getStringFromJson(record.value(), filterPath);248 } catch (PathNotFoundException ex) {249 //Catch any exceptions thrown from message processing/testing as they should have already been reported/dealt with250 //but we don't want to trigger the catch block for Kafka consumption251 match = false;252 LOG.debug("Record discarded - Path not found.");253 } catch (Exception ex) {254 LOG.error(ex, ex);255 }256 LOG.debug("Filtered value : " + recordJSONfiltered);257 if (!recordJSONfiltered.equals(filterValue)) {258 match = false;259 LOG.debug("Record discarded - Value different.");260 }261 }262 if (match) {263 JSONObject messageJSON = new JSONObject();264 messageJSON.put("key", record.key());265 messageJSON.put("value", recordJSON);266 messageJSON.put("offset", record.offset());267 messageJSON.put("partition", record.partition());268 resultJSON.put(messageJSON);269 nbFound++;270 if (nbFound >= targetNbEventsInt) {271 consume = false; //exit the consume loop272 consumer.wakeup(); //takes effect on the next poll loop so need to break.273 break; //if we've found a match, stop looping through the current record batch274 }275 }276 } catch (Exception ex) {277 //Catch any exceptions thrown from message processing/testing as they should have already been reported/dealt with278 //but we don't want to trigger the catch block for Kafka consumption279 LOG.error(ex, ex);280 }281 }282 }283 result.setItem(resultJSON.toString());284 Instant date2 = Instant.now();285 Duration duration = Duration.between(date1, date2);286 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA)287 .resolveDescription("NBEVENT", String.valueOf(nbFound))288 .resolveDescription("NBTOT", String.valueOf(nbEvents))289 .resolveDescription("NBSEC", String.valueOf(duration.getSeconds()));290 } catch (WakeupException e) {291 result.setItem(resultJSON.toString());292 Instant date2 = Instant.now();293 Duration duration = Duration.between(date1, date2);294 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKAPARTIALRESULT)295 .resolveDescription("NBEVENT", String.valueOf(nbFound))296 .resolveDescription("NBTOT", String.valueOf(nbEvents))297 .resolveDescription("NBSEC", String.valueOf(duration.getSeconds()));298 //Ignore299 } catch (Exception ex) {300 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);301 message.setDescription(message.getDescription().replace("%EX%", ex.toString()));302 LOG.debug(ex, ex);303 } finally {304 if (consumer != null) {305 LOG.info("Closed Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));306 consumer.close();307 }308 }309 result.setItem(resultJSON.toString());310 message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKASEARCH).replace("%TOPIC%", topic));311 result.setResultMessage(message);312 return result;313 }314 @Override315 public HashMap<String, Map<TopicPartition, Long>> getAllConsumers(List<TestCaseStep> mainExecutionTestCaseStepList) throws CerberusException, InterruptedException, ExecutionException {316 HashMap<String, Map<TopicPartition, Long>> tempKafka = new HashMap<>();317 AnswerItem<Map<TopicPartition, Long>> resultConsume = new AnswerItem<>();318 for (TestCaseStep testCaseStep : mainExecutionTestCaseStepList) {319 for (TestCaseStepAction testCaseStepAction : testCaseStep.getTestCaseStepAction()) {320 if (testCaseStepAction.getAction().equals(TestCaseStepAction.ACTION_CALLSERVICE)321 && !testCaseStepAction.getConditionOper().equals(TestCaseStepAction.CONDITIONOPER_NEVER)) {322 AnswerItem<AppService> localService = appServiceService.readByKeyWithDependency(testCaseStepAction.getValue1(), "Y");323 if (localService.getItem() != null) {324 if (localService.getItem().getType().equals(AppService.TYPE_KAFKA) && localService.getItem().getMethod().equals(AppService.METHOD_KAFKASEARCH)) {325 resultConsume = seekEvent(localService.getItem().getKafkaTopic(), localService.getItem().getServicePath(), localService.getItem().getHeaderList());326 if (!(resultConsume.isCodeEquals(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA.getCode()))) {327 LOG.debug("TestCase interupted due to error when opening Kafka consume. " + resultConsume.getMessageDescription());328 throw new CerberusException(new MessageGeneral(MessageGeneralEnum.VALIDATION_FAILED_KAFKACONSUMERSEEK)329 .resolveDescription("DETAIL", resultConsume.getMessageDescription()));330 }331 tempKafka.put(getKafkaConsumerKey(localService.getItem().getKafkaTopic(), localService.getItem().getServicePath()), resultConsume.getItem());332 }333 }334 }335 }336 }337 LOG.debug(tempKafka.size() + " consumers lastest offset retrieved.");338 return tempKafka;339 }340}...
getKafkaConsumerKey
Using AI Code Generation
1import org.cerberus.service.kafka.impl.KafkaService;2import org.cerberus.service.kafka.impl.KafkaServiceFactory;3KafkaService kafkaService = KafkaServiceFactory.createKafkaService();4String consumerKey = kafkaService.getKafkaConsumerKey("mytopic", "mygroup");5kafkaService.subscribe(consumerKey, (key, message) -> {6 System.out.println(message);7});8import org.cerberus.service.kafka.impl.KafkaService;9import org.cerberus.service.kafka.impl.KafkaServiceFactory;10KafkaService kafkaService = KafkaServiceFactory.createKafkaService();11String producerKey = kafkaService.getKafkaProducerKey("mytopic");12kafkaService.send(producerKey, "Hello World");13import org.cerberus.service.kafka.impl.KafkaService;14import org.cerberus.service.kafka.impl.KafkaServiceFactory;15KafkaService kafkaService = KafkaServiceFactory.createKafkaService();16String producerKey = kafkaService.getKafkaProducerKey("mytopic");17kafkaService.send(producerKey, "Hello World");18import org.cerberus.service.kafka.impl.KafkaService;19import org.cerberus.service.kafka.impl.KafkaServiceFactory;20KafkaService kafkaService = KafkaServiceFactory.createKafkaService();21String consumerKey = kafkaService.getKafkaConsumerKey("mytopic", "mygroup");22kafkaService.subscribe(consumerKey, (key, message) -> {23 System.out.println(message);24});25import org.cerberus.service.kafka.impl.KafkaService;26import org.cerberus.service.kafka.impl.KafkaServiceFactory;27KafkaService kafkaService = KafkaServiceFactory.createKafkaService();28String producerKey = kafkaService.getKafkaProducerKey("mytopic");29kafkaService.send(producerKey, "Hello World");30import org.cerberus.service.kafka.impl.K
getKafkaConsumerKey
Using AI Code Generation
1KafkaService kafkaService = new KafkaService();2String kafkaConsumerKey = kafkaService.getKafkaConsumerKey();3KafkaConsumer<String, String> kafkaConsumer = kafkaService.getKafkaConsumer(kafkaConsumerKey);4kafkaService.subscribe(kafkaConsumer, "topic_name");5ConsumerRecords<String, String> records = kafkaService.poll(kafkaConsumer);6for (ConsumerRecord<String, String> record : records) {7 System.out.println(record.value());8}9kafkaService.close(kafkaConsumer);10kafkaService.close(kafkaConsumerKey);11String kafkaProducerKey = kafkaService.getKafkaProducerKey();12KafkaProducer<String, String> kafkaProducer = kafkaService.getKafkaProducer(kafkaProducerKey);13kafkaService.send(kafkaProducer, "topic_name", "message");14kafkaService.close(kafkaProducer);15kafkaService.close(kafkaProducerKey);16String kafkaProducerKey = kafkaService.getKafkaProducerKey();17KafkaProducer<String, String> kafkaProducer = kafkaService.getKafkaProducer(kafkaProducerKey);
getKafkaConsumerKey
Using AI Code Generation
1import org.cerberus.service.kafka.impl.KafkaService;2KafkaService kafkaService = new KafkaService();3String kafkaConsumerKey = kafkaService.getKafkaConsumerKey("myTopic", "group1");4import org.cerberus.service.kafka.impl.KafkaService;5KafkaService kafkaService = new KafkaService();6String kafkaConsumerKey = kafkaService.getKafkaConsumerKey("myTopic", "group1");
getKafkaConsumerKey
Using AI Code Generation
1String key = kafkaService.getKafkaConsumerKey();2String key = kafkaService.getKafkaConsumerKey();3String key = kafkaService.getKafkaConsumerKey();4String value = kafkaService.getKafkaConsumerValue();5String value = kafkaService.getKafkaConsumerValue();6String value = kafkaService.getKafkaConsumerValue();7String value = kafkaService.getKafkaConsumerValue();8String value = kafkaService.getKafkaConsumerValue();9String value = kafkaService.getKafkaConsumerValue();10String value = kafkaService.getKafkaConsumerValue();11String value = kafkaService.getKafkaConsumerValue();12String value = kafkaService.getKafkaConsumerValue();13String value = kafkaService.getKafkaConsumerValue();14String value = kafkaService.getKafkaConsumerValue();15String value = kafkaService.getKafkaConsumerValue();16String value = kafkaService.getKafkaConsumerValue();17String value = kafkaService.getKafkaConsumerValue();18String value = kafkaService.getKafkaConsumerValue();
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!