How to use seekEvent method of org.cerberus.service.kafka.impl.KafkaService class

Best Cerberus-source code snippet using org.cerberus.service.kafka.impl.KafkaService.seekEvent

Source:KafkaService.java Github

copy

Full Screen

...142 return result;143 }144 @SuppressWarnings("unchecked")145 @Override146 public AnswerItem<Map<TopicPartition, Long>> seekEvent(String topic, String bootstrapServers,147 List<AppServiceHeader> serviceHeader) throws InterruptedException, ExecutionException {148 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA);149 AnswerItem<Map<TopicPartition, Long>> result = new AnswerItem<>();150 KafkaConsumer consumer = null;151 try {152 Properties props = new Properties();153 serviceHeader.add(factoryAppServiceHeader.create(null, "bootstrap.servers", bootstrapServers, "Y", 0, "", "", null, "", null));154 serviceHeader.add(factoryAppServiceHeader.create(null, "enable.auto.commit", "false", "Y", 0, "", "", null, "", null));155 serviceHeader.add(factoryAppServiceHeader.create(null, "max.poll.records", "10", "Y", 0, "", "", null, "", null));156 serviceHeader.add(factoryAppServiceHeader.create(null, "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));157 serviceHeader.add(factoryAppServiceHeader.create(null, "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));158 for (AppServiceHeader object : serviceHeader) {159 if (StringUtil.parseBoolean(object.getActive())) {160 props.put(object.getKey(), object.getValue());161 }162 }163 LOG.info("Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));164 consumer = new KafkaConsumer<>(props);165 //Get a list of the topics' partitions166 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);167 List<TopicPartition> topicPartitionList = partitionList.stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());168 //Assign all the partitions to this consumer169 consumer.assign(topicPartitionList);170 consumer.seekToEnd(topicPartitionList); //default to latest offset for all partitions171 HashMap<TopicPartition, Long> valueResult = new HashMap<>();172 Map<TopicPartition, Long> partitionOffset = consumer.endOffsets(topicPartitionList);173 result.setItem(partitionOffset);174 } catch (Exception ex) {175 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);176 message.setDescription(message.getDescription().replace("%EX%", ex.toString()).replace("%TOPIC%", topic));177 LOG.debug(ex, ex);178 } finally {179 consumer.close();180 LOG.info("Closed Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));181 }182 result.setResultMessage(message);183 return result;184 }185 @SuppressWarnings("unchecked")186 @Override187 public AnswerItem<String> searchEvent(Map<TopicPartition, Long> mapOffsetPosition, String topic, String bootstrapServers,188 List<AppServiceHeader> serviceHeader, String filterPath, String filterValue, int targetNbEventsInt, int targetNbSecInt) throws InterruptedException, ExecutionException {189 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);190 AnswerItem<String> result = new AnswerItem<>();191 AppService serviceREST = factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKASEARCH, "", "", "", "", "", "", "", "", "", "", "", "", null, "", null, null);192 Instant date1 = Instant.now();193 JSONArray resultJSON = new JSONArray();194 KafkaConsumer consumer = null;195 int nbFound = 0;196 int nbEvents = 0;197 try {198 Properties props = new Properties();199 serviceHeader.add(factoryAppServiceHeader.create(null, "bootstrap.servers", bootstrapServers, "Y", 0, "", "", null, "", null));200 serviceHeader.add(factoryAppServiceHeader.create(null, "enable.auto.commit", "false", "Y", 0, "", "", null, "", null));201 serviceHeader.add(factoryAppServiceHeader.create(null, "max.poll.records", "10", "Y", 0, "", "", null, "", null));202 serviceHeader.add(factoryAppServiceHeader.create(null, "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));203 serviceHeader.add(factoryAppServiceHeader.create(null, "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));204 for (AppServiceHeader object : serviceHeader) {205 if (StringUtil.parseBoolean(object.getActive())) {206 props.put(object.getKey(), object.getValue());207 }208 }209 LOG.info("Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));210 consumer = new KafkaConsumer<>(props);211 //Get a list of the topics' partitions212 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);213 List<TopicPartition> topicPartitionList = partitionList.stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());214 //Assign all the partitions to this consumer215 consumer.assign(topicPartitionList);216 // Setting each partition to correct Offset.217 for (Map.Entry<TopicPartition, Long> entry : mapOffsetPosition.entrySet()) {218 consumer.seek(entry.getKey(), entry.getValue());219 LOG.debug("Partition : " + entry.getKey().partition() + " set to offset : " + entry.getValue());220 }221 boolean consume = true;222 long timeoutTime = Instant.now().plusSeconds(targetNbSecInt).toEpochMilli(); //default to 30 seconds223 int pollDurationSec = 5;224 if (targetNbSecInt < pollDurationSec) {225 pollDurationSec = targetNbSecInt;226 }227 while (consume) {228 LOG.debug("Start Poll.");229 @SuppressWarnings("unchecked")230 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollDurationSec));231 LOG.debug("End Poll.");232 if (Instant.now().toEpochMilli() > timeoutTime) {233 LOG.debug("Timed out searching for record");234 consumer.wakeup(); //exit235 }236 //Now for each record in the batch of records we got from Kafka237 for (ConsumerRecord<String, String> record : records) {238 try {239 LOG.debug("New record " + record.topic() + " " + record.partition() + " " + record.offset());240 LOG.debug(" " + record.key() + " | " + record.value());241 JSONObject recordJSON = new JSONObject(record.value());242 nbEvents++;243 boolean match = true;244 if (!StringUtil.isNullOrEmpty(filterPath)) {245 String recordJSONfiltered = "";246 try {247 recordJSONfiltered = jsonService.getStringFromJson(record.value(), filterPath);248 } catch (PathNotFoundException ex) {249 //Catch any exceptions thrown from message processing/testing as they should have already been reported/dealt with250 //but we don't want to trigger the catch block for Kafka consumption251 match = false;252 LOG.debug("Record discarded - Path not found.");253 } catch (Exception ex) {254 LOG.error(ex, ex);255 }256 LOG.debug("Filtered value : " + recordJSONfiltered);257 if (!recordJSONfiltered.equals(filterValue)) {258 match = false;259 LOG.debug("Record discarded - Value different.");260 }261 }262 if (match) {263 JSONObject messageJSON = new JSONObject();264 messageJSON.put("key", record.key());265 messageJSON.put("value", recordJSON);266 messageJSON.put("offset", record.offset());267 messageJSON.put("partition", record.partition());268 resultJSON.put(messageJSON);269 nbFound++;270 if (nbFound >= targetNbEventsInt) {271 consume = false; //exit the consume loop272 consumer.wakeup(); //takes effect on the next poll loop so need to break.273 break; //if we've found a match, stop looping through the current record batch274 }275 }276 } catch (Exception ex) {277 //Catch any exceptions thrown from message processing/testing as they should have already been reported/dealt with278 //but we don't want to trigger the catch block for Kafka consumption279 LOG.error(ex, ex);280 }281 }282 }283 result.setItem(resultJSON.toString());284 Instant date2 = Instant.now();285 Duration duration = Duration.between(date1, date2);286 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA)287 .resolveDescription("NBEVENT", String.valueOf(nbFound))288 .resolveDescription("NBTOT", String.valueOf(nbEvents))289 .resolveDescription("NBSEC", String.valueOf(duration.getSeconds()));290 } catch (WakeupException e) {291 result.setItem(resultJSON.toString());292 Instant date2 = Instant.now();293 Duration duration = Duration.between(date1, date2);294 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKAPARTIALRESULT)295 .resolveDescription("NBEVENT", String.valueOf(nbFound))296 .resolveDescription("NBTOT", String.valueOf(nbEvents))297 .resolveDescription("NBSEC", String.valueOf(duration.getSeconds()));298 //Ignore299 } catch (Exception ex) {300 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);301 message.setDescription(message.getDescription().replace("%EX%", ex.toString()));302 LOG.debug(ex, ex);303 } finally {304 if (consumer != null) {305 LOG.info("Closed Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));306 consumer.close();307 }308 }309 result.setItem(resultJSON.toString());310 message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKASEARCH).replace("%TOPIC%", topic));311 result.setResultMessage(message);312 return result;313 }314 @Override315 public HashMap<String, Map<TopicPartition, Long>> getAllConsumers(List<TestCaseStep> mainExecutionTestCaseStepList) throws CerberusException, InterruptedException, ExecutionException {316 HashMap<String, Map<TopicPartition, Long>> tempKafka = new HashMap<>();317 AnswerItem<Map<TopicPartition, Long>> resultConsume = new AnswerItem<>();318 for (TestCaseStep testCaseStep : mainExecutionTestCaseStepList) {319 for (TestCaseStepAction testCaseStepAction : testCaseStep.getTestCaseStepAction()) {320 if (testCaseStepAction.getAction().equals(TestCaseStepAction.ACTION_CALLSERVICE)321 && !testCaseStepAction.getConditionOper().equals(TestCaseStepAction.CONDITIONOPER_NEVER)) {322 AnswerItem<AppService> localService = appServiceService.readByKeyWithDependency(testCaseStepAction.getValue1(), "Y");323 if (localService.getItem() != null) {324 if (localService.getItem().getType().equals(AppService.TYPE_KAFKA) && localService.getItem().getMethod().equals(AppService.METHOD_KAFKASEARCH)) {325 resultConsume = seekEvent(localService.getItem().getKafkaTopic(), localService.getItem().getServicePath(), localService.getItem().getHeaderList());326 if (!(resultConsume.isCodeEquals(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA.getCode()))) {327 LOG.debug("TestCase interupted due to error when opening Kafka consume. " + resultConsume.getMessageDescription());328 throw new CerberusException(new MessageGeneral(MessageGeneralEnum.VALIDATION_FAILED_KAFKACONSUMERSEEK)329 .resolveDescription("DETAIL", resultConsume.getMessageDescription()));330 }331 tempKafka.put(getKafkaConsumerKey(localService.getItem().getKafkaTopic(), localService.getItem().getServicePath()), resultConsume.getItem());332 }333 }334 }335 }336 }337 LOG.debug(tempKafka.size() + " consumers lastest offset retrieved.");338 return tempKafka;339 }...

Full Screen

Full Screen

seekEvent

Using AI Code Generation

copy

Full Screen

1import org.cerberus.service.kafka.impl.KafkaService;2import org.cerberus.service.kafka.impl.KafkaServiceFactory;3import org.cerberus.service.kafka.impl.KafkaTopic;4import org.cerberus.service.kafka.impl.KafkaTopicFactory;5import org.cerberus.util.CerberusException;6import org.cerberus.util.CerberusExceptionFactory;7KafkaService kafkaService = KafkaServiceFactory.create();8KafkaTopic kafkaTopic = KafkaTopicFactory.create("test-topic");9CerberusException cerberusException = CerberusExceptionFactory.create();10kafkaService.seekEvent(kafkaTopic, cerberusException);11if (cerberusException != null) {12 System.out.println(cerberusException.getMessage());13}14if (cerberusException == null) {15 System.out.println("seekEvent method executed successfully");16}17System.out.println(kafkaService);18System.out.println(kafkaTopic);19System.out.println(cerberusException);

Full Screen

Full Screen

seekEvent

Using AI Code Generation

copy

Full Screen

1import org.cerberus.service.kafka.impl.KafkaService2import org.cerberus.service.kafka.impl.KafkaServiceFactory3import org.cerberus.service.kafka.impl.KafkaServiceImpl4import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl5import org.cerberus.service.kafka.impl.KafkaServiceFactory6import org.cerberus.service.kafka.impl.KafkaServiceImpl7import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl8import org.cerberus.service.kafka.impl.KafkaServiceFactory9import org.cerberus.service.kafka.impl.KafkaServiceImpl10import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl11import org.cerberus.service.kafka.impl.KafkaServiceFactory12import org.cerberus.service.kafka.impl.KafkaServiceImpl13import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl14import org.cerberus.service.kafka.impl.KafkaServiceFactory15import org.cerberus.service.kafka.impl.KafkaServiceImpl16import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl17import org.cerberus.service.kafka.impl.KafkaServiceFactory18import org.cerberus.service.kafka.impl.KafkaServiceImpl19import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl20import org.cerberus.service.kafka.impl.KafkaServiceFactory21import org.cerberus.service.kafka.impl.KafkaServiceImpl22import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl23import org.cerberus.service.kafka.impl.KafkaServiceFactory24import org.cerberus.service.kafka.impl.KafkaServiceImpl25import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl26import org.cerberus.service.kafka.impl.KafkaServiceFactory27import org.cerberus.service.kafka.impl.KafkaServiceImpl28import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl29import org.cerberus.service.kafka.impl.KafkaServiceFactory30import org.cerberus.service.kafka.impl.KafkaServiceImpl31import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl32import org.cerberus.service.kafka.impl.KafkaServiceFactory33import org.cerberus.service.kafka.impl.KafkaServiceImpl34import org.cerberus.service.kafka.impl.KafkaServiceFactoryImpl35import org.cerberus.service.kafka.impl.KafkaServiceFactory36import org.cerberus.service.kafka.impl

Full Screen

Full Screen

seekEvent

Using AI Code Generation

copy

Full Screen

1import org.cerberus.service.kafka.impl.KafkaService;2import org.cerberus.service.kafka.impl.KafkaServiceFactory;3import org.cerberus.util.KafkaUtil;4import java.util.Properties;5public class KafkaSeekEventExample {6 public static void main(String[] args) throws Exception {7 Properties properties = KafkaUtil.loadProperties("kafka.properties");8 KafkaService kafkaService = KafkaServiceFactory.getKafkaService(properties);9 kafkaService.seekEvent("test_topic", 0, 0);10 }11}

Full Screen

Full Screen

seekEvent

Using AI Code Generation

copy

Full Screen

1KafkaService kafkaService = new KafkaService();2KafkaTopic kafkaTopic = new KafkaTopic();3kafkaTopic.setTopic("topicName");4kafkaTopic.setPartition("0");5kafkaTopic.setOffset("0");6kafkaTopic.setTopic("topicName");7kafkaTopic.setPartition("0");8kafkaTopic.setOffset("0");9kafkaTopic.setTopic("topicName");10kafkaTopic.setPartition("0");11kafkaTopic.setOffset("0");12kafkaTopic.setTopic("topicName");13kafkaTopic.setPartition("0");14kafkaTopic.setOffset("0");15kafkaService.addTopic(kafkaTopic);16kafkaService.seekEvent();17kafkaService.seekEvent();18kafkaService.seekEvent();19kafkaService.seekEvent();

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Cerberus-source automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful