How to use answer method of org.easymock.internal.Result class

Best Easymock code snippet using org.easymock.internal.Result.answer

Source:InternalTopicManagerTest.java Github

copy

Full Screen

1/*2 * Licensed to the Apache Software Foundation (ASF) under one or more3 * contributor license agreements. See the NOTICE file distributed with4 * this work for additional information regarding copyright ownership.5 * The ASF licenses this file to You under the Apache License, Version 2.06 * (the "License"); you may not use this file except in compliance with7 * the License. You may obtain a copy of the License at8 *9 * http://www.apache.org/licenses/LICENSE-2.010 *11 * Unless required by applicable law or agreed to in writing, software12 * distributed under the License is distributed on an "AS IS" BASIS,13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14 * See the License for the specific language governing permissions and15 * limitations under the License.16 */17package org.apache.kafka.streams.processor.internals;18import org.apache.kafka.clients.admin.AdminClient;19import org.apache.kafka.clients.admin.Config;20import org.apache.kafka.clients.admin.ConfigEntry;21import org.apache.kafka.clients.admin.CreateTopicsOptions;22import org.apache.kafka.clients.admin.CreateTopicsResult;23import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;24import org.apache.kafka.clients.admin.DeleteTopicsResult;25import org.apache.kafka.clients.admin.DescribeConfigsResult;26import org.apache.kafka.clients.admin.DescribeTopicsResult;27import org.apache.kafka.clients.admin.MockAdminClient;28import org.apache.kafka.clients.admin.NewTopic;29import org.apache.kafka.clients.admin.TopicDescription;30import org.apache.kafka.clients.consumer.ConsumerConfig;31import org.apache.kafka.clients.producer.ProducerConfig;32import org.apache.kafka.common.KafkaFuture;33import org.apache.kafka.common.Node;34import org.apache.kafka.common.TopicPartitionInfo;35import org.apache.kafka.common.Uuid;36import org.apache.kafka.common.config.ConfigResource;37import org.apache.kafka.common.config.ConfigResource.Type;38import org.apache.kafka.common.config.TopicConfig;39import org.apache.kafka.common.errors.LeaderNotAvailableException;40import org.apache.kafka.common.errors.TimeoutException;41import org.apache.kafka.common.errors.TopicExistsException;42import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;43import org.apache.kafka.common.errors.UnsupportedVersionException;44import org.apache.kafka.common.internals.KafkaFutureImpl;45import org.apache.kafka.common.message.CreateTopicsRequestData;46import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;47import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;48import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;49import org.apache.kafka.common.requests.CreateTopicsRequest;50import org.apache.kafka.common.utils.MockTime;51import org.apache.kafka.streams.StreamsConfig;52import org.apache.kafka.streams.errors.StreamsException;53import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult;54import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;55import org.easymock.EasyMock;56import org.junit.After;57import org.junit.Before;58import org.junit.Test;59import java.util.ArrayList;60import java.util.Collection;61import java.util.Collections;62import java.util.HashMap;63import java.util.List;64import java.util.Map;65import java.util.Optional;66import java.util.Set;67import java.util.stream.Collectors;68import static org.apache.kafka.common.utils.Utils.mkEntry;69import static org.apache.kafka.common.utils.Utils.mkMap;70import static org.apache.kafka.common.utils.Utils.mkSet;71import static org.hamcrest.MatcherAssert.assertThat;72import static org.hamcrest.Matchers.anEmptyMap;73import static org.hamcrest.Matchers.equalTo;74import static org.hamcrest.Matchers.empty;75import static org.hamcrest.Matchers.hasItem;76import static org.hamcrest.Matchers.hasKey;77import static org.hamcrest.Matchers.is;78import static org.hamcrest.Matchers.not;79import static org.junit.Assert.assertEquals;80import static org.junit.Assert.assertNull;81import static org.junit.Assert.assertThrows;82import static org.junit.Assert.fail;83public class InternalTopicManagerTest {84 private final Node broker1 = new Node(0, "dummyHost-1", 1234);85 private final Node broker2 = new Node(1, "dummyHost-2", 1234);86 private final List<Node> cluster = new ArrayList<Node>(2) {87 {88 add(broker1);89 add(broker2);90 }91 };92 private final String topic1 = "test_topic";93 private final String topic2 = "test_topic_2";94 private final String topic3 = "test_topic_3";95 private final String topic4 = "test_topic_4";96 private final String topic5 = "test_topic_5";97 private final List<Node> singleReplica = Collections.singletonList(broker1);98 private String threadName;99 private MockAdminClient mockAdminClient;100 private InternalTopicManager internalTopicManager;101 private final MockTime time = new MockTime(0);102 private final Map<String, Object> config = new HashMap<String, Object>() {103 {104 put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");105 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + broker1.port());106 put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);107 put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 16384);108 put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), 100);109 put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 10);110 }111 };112 @Before113 public void init() {114 threadName = Thread.currentThread().getName();115 mockAdminClient = new MockAdminClient(cluster, broker1);116 internalTopicManager = new InternalTopicManager(117 time,118 mockAdminClient,119 new StreamsConfig(config)120 );121 }122 @After123 public void shutdown() {124 mockAdminClient.close();125 }126 @Test127 public void shouldCreateTopics() throws Exception {128 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);129 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);130 internalTopicManager.setup(mkMap(131 mkEntry(topic1, internalTopicConfig1),132 mkEntry(topic2, internalTopicConfig2)133 ));134 final Set<String> newlyCreatedTopics = mockAdminClient.listTopics().names().get();135 assertThat(newlyCreatedTopics.size(), is(2));136 assertThat(newlyCreatedTopics, hasItem(topic1));137 assertThat(newlyCreatedTopics, hasItem(topic2));138 }139 @Test140 public void shouldNotCreateTopicsWithEmptyInput() throws Exception {141 internalTopicManager.setup(Collections.emptyMap());142 final Set<String> newlyCreatedTopics = mockAdminClient.listTopics().names().get();143 assertThat(newlyCreatedTopics, empty());144 }145 @Test146 public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {147 final AdminClient admin = EasyMock.createMock(AdminClient.class);148 final StreamsConfig streamsConfig = new StreamsConfig(config);149 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);150 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture = new KafkaFutureImpl<>();151 createTopicFailFuture.completeExceptionally(new TopicExistsException("exists"));152 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicSuccessfulFuture = new KafkaFutureImpl<>();153 createTopicSuccessfulFuture.complete(154 new TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList()))155 );156 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);157 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);158 final NewTopic newTopic1 = newTopic(topic1, internalTopicConfig1, streamsConfig);159 final NewTopic newTopic2 = newTopic(topic2, internalTopicConfig2, streamsConfig);160 EasyMock.expect(admin.createTopics(mkSet(newTopic1, newTopic2)))161 .andAnswer(() -> new MockCreateTopicsResult(mkMap(162 mkEntry(topic1, createTopicSuccessfulFuture),163 mkEntry(topic2, createTopicFailFuture)164 )));165 EasyMock.expect(admin.createTopics(mkSet(newTopic2)))166 .andAnswer(() -> new MockCreateTopicsResult(mkMap(167 mkEntry(topic2, createTopicSuccessfulFuture)168 )));169 EasyMock.replay(admin);170 topicManager.setup(mkMap(171 mkEntry(topic1, internalTopicConfig1),172 mkEntry(topic2, internalTopicConfig2)173 ));174 EasyMock.verify(admin);175 }176 @Test177 public void shouldRetryCreateTopicWhenCreationTimesOut() {178 shouldRetryCreateTopicWhenRetriableExceptionIsThrown(new TimeoutException("timed out"));179 }180 @Test181 public void shouldRetryCreateTopicWhenTopicNotYetDeleted() {182 shouldRetryCreateTopicWhenRetriableExceptionIsThrown(new TopicExistsException("exists"));183 }184 private void shouldRetryCreateTopicWhenRetriableExceptionIsThrown(final Exception retriableException) {185 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);186 final StreamsConfig streamsConfig = new StreamsConfig(config);187 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);188 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture = new KafkaFutureImpl<>();189 createTopicFailFuture.completeExceptionally(retriableException);190 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicSuccessfulFuture = new KafkaFutureImpl<>();191 createTopicSuccessfulFuture.complete(192 new TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList()))193 );194 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);195 final NewTopic newTopic = newTopic(topic1, internalTopicConfig, streamsConfig);196 EasyMock.expect(admin.createTopics(mkSet(newTopic)))197 .andAnswer(() -> new MockCreateTopicsResult(mkMap(198 mkEntry(topic1, createTopicSuccessfulFuture)199 )));200 EasyMock.expect(admin.createTopics(mkSet(newTopic)))201 .andAnswer(() -> new MockCreateTopicsResult(mkMap(202 mkEntry(topic2, createTopicSuccessfulFuture)203 )));204 EasyMock.replay(admin);205 topicManager.setup(mkMap(206 mkEntry(topic1, internalTopicConfig)207 ));208 }209 @Test210 public void shouldThrowInformativeExceptionForOlderBrokers() {211 final AdminClient admin = new MockAdminClient() {212 @Override213 public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,214 final CreateTopicsOptions options) {215 final CreatableTopic topicToBeCreated = new CreatableTopic();216 topicToBeCreated.setAssignments(new CreatableReplicaAssignmentCollection());217 topicToBeCreated.setNumPartitions((short) 1);218 // set unsupported replication factor for older brokers219 topicToBeCreated.setReplicationFactor((short) -1);220 final CreatableTopicCollection topicsToBeCreated = new CreatableTopicCollection();221 topicsToBeCreated.add(topicToBeCreated);222 try {223 new CreateTopicsRequest.Builder(224 new CreateTopicsRequestData()225 .setTopics(topicsToBeCreated)226 .setTimeoutMs(0)227 .setValidateOnly(options.shouldValidateOnly()))228 .build((short) 3); // pass in old unsupported request version for old brokers229 throw new IllegalStateException("Building CreateTopicRequest should have thrown.");230 } catch (final UnsupportedVersionException expected) {231 final KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();232 future.completeExceptionally(expected);233 return new CreateTopicsResult(Collections.singletonMap(topic1, future)) { };234 }235 }236 };237 final StreamsConfig streamsConfig = new StreamsConfig(config);238 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);239 final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());240 topicConfig.setNumberOfPartitions(1);241 final StreamsException exception = assertThrows(242 StreamsException.class,243 () -> topicManager.makeReady(Collections.singletonMap(topic1, topicConfig))244 );245 assertThat(246 exception.getMessage(),247 equalTo("Could not create topic " + topic1 + ", because brokers don't support configuration replication.factor=-1."248 + " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));249 }250 @Test251 public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {252 setupTopicInMockAdminClient(topic1, Collections.emptyMap());253 final MockTime time = new MockTime(254 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 15255 );256 final InternalTopicManager internalTopicManager =257 new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));258 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);259 final TimeoutException exception = assertThrows(260 TimeoutException.class,261 () -> internalTopicManager.setup(Collections.singletonMap(topic1, internalTopicConfig))262 );263 assertThat(264 exception.getMessage(),265 is("Could not create internal topics within " +266 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 2 +267 " milliseconds. This can happen if the Kafka cluster is temporarily not available or a topic is marked" +268 " for deletion and the broker did not complete its deletion within the timeout." +269 " The last errors seen per topic are:" +270 " {" + topic1 + "=org.apache.kafka.common.errors.TopicExistsException: Topic test_topic exists already.}")271 );272 }273 @Test274 public void shouldThrowWhenCreateTopicsThrowsUnexpectedException() {275 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);276 final StreamsConfig streamsConfig = new StreamsConfig(config);277 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);278 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);279 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture = new KafkaFutureImpl<>();280 createTopicFailFuture.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));281 final NewTopic newTopic = newTopic(topic1, internalTopicConfig, streamsConfig);282 EasyMock.expect(admin.createTopics(mkSet(newTopic)))283 .andStubAnswer(() -> new MockCreateTopicsResult(mkMap(284 mkEntry(topic1, createTopicFailFuture)285 )));286 EasyMock.replay(admin);287 assertThrows(StreamsException.class, () -> topicManager.setup(mkMap(288 mkEntry(topic1, internalTopicConfig)289 )));290 }291 @Test292 public void shouldThrowWhenCreateTopicsResultsDoNotContainTopic() {293 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);294 final StreamsConfig streamsConfig = new StreamsConfig(config);295 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);296 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);297 final NewTopic newTopic = newTopic(topic1, internalTopicConfig, streamsConfig);298 EasyMock.expect(admin.createTopics(mkSet(newTopic)))299 .andStubAnswer(() -> new MockCreateTopicsResult(Collections.singletonMap(topic2, new KafkaFutureImpl<>())));300 EasyMock.replay(admin);301 assertThrows(302 IllegalStateException.class,303 () -> topicManager.setup(Collections.singletonMap(topic1, internalTopicConfig))304 );305 }306 @Test307 public void shouldThrowTimeoutExceptionWhenCreateTopicExceedsTimeout() {308 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);309 final MockTime time = new MockTime(310 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 3311 );312 final StreamsConfig streamsConfig = new StreamsConfig(config);313 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);314 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture = new KafkaFutureImpl<>();315 createTopicFailFuture.completeExceptionally(new TimeoutException());316 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);317 final NewTopic newTopic = newTopic(topic1, internalTopicConfig, streamsConfig);318 EasyMock.expect(admin.createTopics(mkSet(newTopic)))319 .andStubAnswer(() -> new MockCreateTopicsResult(mkMap(mkEntry(topic1, createTopicFailFuture))));320 EasyMock.replay(admin);321 assertThrows(322 TimeoutException.class,323 () -> topicManager.setup(Collections.singletonMap(topic1, internalTopicConfig))324 );325 }326 @Test327 public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringSetup() {328 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);329 final MockTime time = new MockTime(330 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 3331 );332 final StreamsConfig streamsConfig = new StreamsConfig(config);333 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);334 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFutureThatNeverCompletes = new KafkaFutureImpl<>();335 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);336 final NewTopic newTopic = newTopic(topic1, internalTopicConfig, streamsConfig);337 EasyMock.expect(admin.createTopics(mkSet(newTopic)))338 .andStubAnswer(() -> new MockCreateTopicsResult(mkMap(mkEntry(topic1, createTopicFutureThatNeverCompletes))));339 EasyMock.replay(admin);340 assertThrows(341 TimeoutException.class,342 () -> topicManager.setup(Collections.singletonMap(topic1, internalTopicConfig))343 );344 }345 @Test346 public void shouldCleanUpWhenUnexpectedExceptionIsThrownDuringSetup() {347 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);348 final StreamsConfig streamsConfig = new StreamsConfig(config);349 final MockTime time = new MockTime(350 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 3351 );352 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);353 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);354 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);355 setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);356 final KafkaFutureImpl<Void> deleteTopicSuccessfulFuture = new KafkaFutureImpl<>();357 deleteTopicSuccessfulFuture.complete(null);358 EasyMock.expect(admin.deleteTopics(mkSet(topic1)))359 .andAnswer(() -> new MockDeleteTopicsResult(mkMap(mkEntry(topic1, deleteTopicSuccessfulFuture))));360 EasyMock.replay(admin);361 assertThrows(362 StreamsException.class,363 () -> topicManager.setup(mkMap(364 mkEntry(topic1, internalTopicConfig1),365 mkEntry(topic2, internalTopicConfig2)366 ))367 );368 EasyMock.verify(admin);369 }370 @Test371 public void shouldCleanUpWhenCreateTopicsResultsDoNotContainTopic() {372 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);373 final StreamsConfig streamsConfig = new StreamsConfig(config);374 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);375 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);376 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);377 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture1 = new KafkaFutureImpl<>();378 createTopicFailFuture1.completeExceptionally(new TopicExistsException("exists"));379 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicSuccessfulFuture = new KafkaFutureImpl<>();380 createTopicSuccessfulFuture.complete(381 new TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList()))382 );383 final NewTopic newTopic1 = newTopic(topic1, internalTopicConfig1, streamsConfig);384 final NewTopic newTopic2 = newTopic(topic2, internalTopicConfig2, streamsConfig);385 EasyMock.expect(admin.createTopics(mkSet(newTopic1, newTopic2)))386 .andAnswer(() -> new MockCreateTopicsResult(mkMap(387 mkEntry(topic1, createTopicSuccessfulFuture),388 mkEntry(topic2, createTopicFailFuture1)389 )));390 EasyMock.expect(admin.createTopics(mkSet(newTopic2)))391 .andAnswer(() -> new MockCreateTopicsResult(mkMap(392 mkEntry(topic3, createTopicSuccessfulFuture)393 )));394 final KafkaFutureImpl<Void> deleteTopicSuccessfulFuture = new KafkaFutureImpl<>();395 deleteTopicSuccessfulFuture.complete(null);396 EasyMock.expect(admin.deleteTopics(mkSet(topic1)))397 .andAnswer(() -> new MockDeleteTopicsResult(mkMap(mkEntry(topic1, deleteTopicSuccessfulFuture))));398 EasyMock.replay(admin);399 assertThrows(400 IllegalStateException.class,401 () -> topicManager.setup(mkMap(402 mkEntry(topic1, internalTopicConfig1),403 mkEntry(topic2, internalTopicConfig2)404 ))405 );406 EasyMock.verify(admin);407 }408 @Test409 public void shouldCleanUpWhenCreateTopicsTimesOut() {410 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);411 final StreamsConfig streamsConfig = new StreamsConfig(config);412 final MockTime time = new MockTime(413 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 3414 );415 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);416 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);417 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);418 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture1 = new KafkaFutureImpl<>();419 createTopicFailFuture1.completeExceptionally(new TopicExistsException("exists"));420 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicSuccessfulFuture = new KafkaFutureImpl<>();421 createTopicSuccessfulFuture.complete(422 new TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList()))423 );424 final NewTopic newTopic1 = newTopic(topic1, internalTopicConfig1, streamsConfig);425 final NewTopic newTopic2 = newTopic(topic2, internalTopicConfig2, streamsConfig);426 EasyMock.expect(admin.createTopics(mkSet(newTopic1, newTopic2)))427 .andAnswer(() -> new MockCreateTopicsResult(mkMap(428 mkEntry(topic1, createTopicSuccessfulFuture),429 mkEntry(topic2, createTopicFailFuture1)430 )));431 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFutureThatNeverCompletes = new KafkaFutureImpl<>();432 EasyMock.expect(admin.createTopics(mkSet(newTopic2)))433 .andStubAnswer(() -> new MockCreateTopicsResult(mkMap(mkEntry(topic2, createTopicFutureThatNeverCompletes))));434 final KafkaFutureImpl<Void> deleteTopicSuccessfulFuture = new KafkaFutureImpl<>();435 deleteTopicSuccessfulFuture.complete(null);436 EasyMock.expect(admin.deleteTopics(mkSet(topic1)))437 .andAnswer(() -> new MockDeleteTopicsResult(mkMap(mkEntry(topic1, deleteTopicSuccessfulFuture))));438 EasyMock.replay(admin);439 assertThrows(440 TimeoutException.class,441 () -> topicManager.setup(mkMap(442 mkEntry(topic1, internalTopicConfig1),443 mkEntry(topic2, internalTopicConfig2)444 ))445 );446 EasyMock.verify(admin);447 }448 @Test449 public void shouldRetryDeleteTopicWhenTopicUnknown() {450 shouldRetryDeleteTopicWhenRetriableException(new UnknownTopicOrPartitionException());451 }452 @Test453 public void shouldRetryDeleteTopicWhenLeaderNotAvailable() {454 shouldRetryDeleteTopicWhenRetriableException(new LeaderNotAvailableException("leader not available"));455 }456 @Test457 public void shouldRetryDeleteTopicWhenFutureTimesOut() {458 shouldRetryDeleteTopicWhenRetriableException(new TimeoutException("timed out"));459 }460 private void shouldRetryDeleteTopicWhenRetriableException(final Exception retriableException) {461 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);462 final StreamsConfig streamsConfig = new StreamsConfig(config);463 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);464 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);465 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);466 setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);467 final KafkaFutureImpl<Void> deleteTopicFailFuture = new KafkaFutureImpl<>();468 deleteTopicFailFuture.completeExceptionally(retriableException);469 final KafkaFutureImpl<Void> deleteTopicSuccessfulFuture = new KafkaFutureImpl<>();470 deleteTopicSuccessfulFuture.complete(null);471 EasyMock.expect(admin.deleteTopics(mkSet(topic1)))472 .andAnswer(() -> new MockDeleteTopicsResult(mkMap(mkEntry(topic1, deleteTopicFailFuture))))473 .andAnswer(() -> new MockDeleteTopicsResult(mkMap(mkEntry(topic1, deleteTopicSuccessfulFuture))));474 EasyMock.replay(admin);475 assertThrows(476 StreamsException.class,477 () -> topicManager.setup(mkMap(478 mkEntry(topic1, internalTopicConfig1),479 mkEntry(topic2, internalTopicConfig2)480 ))481 );482 EasyMock.verify();483 }484 @Test485 public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringCleanUp() {486 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);487 final StreamsConfig streamsConfig = new StreamsConfig(config);488 final MockTime time = new MockTime(489 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 3490 );491 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);492 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);493 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);494 setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);495 final KafkaFutureImpl<Void> deleteTopicFutureThatNeverCompletes = new KafkaFutureImpl<>();496 EasyMock.expect(admin.deleteTopics(mkSet(topic1)))497 .andStubAnswer(() -> new MockDeleteTopicsResult(mkMap(mkEntry(topic1, deleteTopicFutureThatNeverCompletes))));498 EasyMock.replay(admin);499 assertThrows(500 TimeoutException.class,501 () -> topicManager.setup(mkMap(502 mkEntry(topic1, internalTopicConfig1),503 mkEntry(topic2, internalTopicConfig2)504 ))505 );506 }507 @Test508 public void shouldThrowWhenDeleteTopicsThrowsUnexpectedException() {509 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);510 final StreamsConfig streamsConfig = new StreamsConfig(config);511 final InternalTopicManager topicManager = new InternalTopicManager(time, admin, streamsConfig);512 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);513 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);514 setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);515 final KafkaFutureImpl<Void> deleteTopicFailFuture = new KafkaFutureImpl<>();516 deleteTopicFailFuture.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));517 EasyMock.expect(admin.deleteTopics(mkSet(topic1)))518 .andStubAnswer(() -> new MockDeleteTopicsResult(mkMap(mkEntry(topic1, deleteTopicFailFuture))));519 EasyMock.replay(admin);520 assertThrows(521 StreamsException.class,522 () -> topicManager.setup(mkMap(523 mkEntry(topic1, internalTopicConfig1),524 mkEntry(topic2, internalTopicConfig2)525 ))526 );527 }528 private void setupCleanUpScenario(final AdminClient admin, final StreamsConfig streamsConfig, final InternalTopicConfig internalTopicConfig1, final InternalTopicConfig internalTopicConfig2) {529 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture1 = new KafkaFutureImpl<>();530 createTopicFailFuture1.completeExceptionally(new TopicExistsException("exists"));531 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicFailFuture2 = new KafkaFutureImpl<>();532 createTopicFailFuture2.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));533 final KafkaFutureImpl<TopicMetadataAndConfig> createTopicSuccessfulFuture = new KafkaFutureImpl<>();534 createTopicSuccessfulFuture.complete(535 new TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList()))536 );537 final NewTopic newTopic1 = newTopic(topic1, internalTopicConfig1, streamsConfig);538 final NewTopic newTopic2 = newTopic(topic2, internalTopicConfig2, streamsConfig);539 EasyMock.expect(admin.createTopics(mkSet(newTopic1, newTopic2)))540 .andAnswer(() -> new MockCreateTopicsResult(mkMap(541 mkEntry(topic1, createTopicSuccessfulFuture),542 mkEntry(topic2, createTopicFailFuture1)543 )));544 EasyMock.expect(admin.createTopics(mkSet(newTopic2)))545 .andAnswer(() -> new MockCreateTopicsResult(mkMap(546 mkEntry(topic2, createTopicFailFuture2)547 )));548 }549 @Test550 public void shouldReturnCorrectPartitionCounts() {551 mockAdminClient.addTopic(552 false,553 topic1,554 Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())),555 null);556 assertEquals(Collections.singletonMap(topic1, 1),557 internalTopicManager.getNumPartitions(Collections.singleton(topic1), Collections.emptySet()));558 }559 @Test560 public void shouldCreateRequiredTopics() throws Exception {561 final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());562 topicConfig.setNumberOfPartitions(1);563 final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig(topic2, Collections.emptyMap());564 topicConfig2.setNumberOfPartitions(1);565 final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig(topic3, Collections.emptyMap());566 topicConfig3.setNumberOfPartitions(1);567 internalTopicManager.makeReady(Collections.singletonMap(topic1, topicConfig));568 internalTopicManager.makeReady(Collections.singletonMap(topic2, topicConfig2));569 internalTopicManager.makeReady(Collections.singletonMap(topic3, topicConfig3));570 assertEquals(mkSet(topic1, topic2, topic3), mockAdminClient.listTopics().names().get());571 assertEquals(new TopicDescription(topic1, false, new ArrayList<TopicPartitionInfo>() {572 {573 add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));574 }575 }), mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get());576 assertEquals(new TopicDescription(topic2, false, new ArrayList<TopicPartitionInfo>() {577 {578 add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));579 }580 }), mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get());581 assertEquals(new TopicDescription(topic3, false, new ArrayList<TopicPartitionInfo>() {582 {583 add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));584 }585 }), mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get());586 final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1);587 final ConfigResource resource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2);588 final ConfigResource resource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3);589 assertEquals(590 new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),591 mockAdminClient.describeConfigs(Collections.singleton(resource)).values().get(resource).get().get(TopicConfig.CLEANUP_POLICY_CONFIG)592 );593 assertEquals(594 new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT),595 mockAdminClient.describeConfigs(Collections.singleton(resource2)).values().get(resource2).get().get(TopicConfig.CLEANUP_POLICY_CONFIG)596 );597 assertEquals(598 new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE),599 mockAdminClient.describeConfigs(Collections.singleton(resource3)).values().get(resource3).get().get(TopicConfig.CLEANUP_POLICY_CONFIG)600 );601 }602 @Test603 public void shouldCompleteTopicValidationOnRetry() {604 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);605 final InternalTopicManager topicManager = new InternalTopicManager(606 time,607 admin,608 new StreamsConfig(config)609 );610 final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1,611 Collections.singletonList(broker1), Collections.singletonList(broker1));612 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessFuture = new KafkaFutureImpl<>();613 final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();614 topicDescriptionSuccessFuture.complete(615 new TopicDescription(topic1, false, Collections.singletonList(partitionInfo), Collections.emptySet())616 );617 topicDescriptionFailFuture.completeExceptionally(new UnknownTopicOrPartitionException("KABOOM!"));618 final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();619 topicCreationFuture.completeExceptionally(new TopicExistsException("KABOOM!"));620 // let the first describe succeed on topic, and fail on topic2, and then let creation throws topics-existed;621 // it should retry with just topic2 and then let it succeed622 EasyMock.expect(admin.describeTopics(mkSet(topic1, topic2)))623 .andReturn(new MockDescribeTopicsResult(mkMap(624 mkEntry(topic1, topicDescriptionSuccessFuture),625 mkEntry(topic2, topicDescriptionFailFuture)626 ))).once();627 EasyMock.expect(admin.createTopics(Collections.singleton(new NewTopic(topic2, Optional.of(1), Optional.of((short) 1))628 .configs(mkMap(mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT),629 mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"))))))630 .andReturn(new MockCreateTopicsResult(Collections.singletonMap(topic2, topicCreationFuture))).once();631 EasyMock.expect(admin.describeTopics(Collections.singleton(topic2)))632 .andReturn(new MockDescribeTopicsResult(Collections.singletonMap(topic2, topicDescriptionSuccessFuture)));633 EasyMock.replay(admin);634 final InternalTopicConfig topicConfig = new UnwindowedChangelogTopicConfig(topic1, Collections.emptyMap());635 topicConfig.setNumberOfPartitions(1);636 final InternalTopicConfig topic2Config = new UnwindowedChangelogTopicConfig(topic2, Collections.emptyMap());637 topic2Config.setNumberOfPartitions(1);638 topicManager.makeReady(mkMap(639 mkEntry(topic1, topicConfig),640 mkEntry(topic2, topic2Config)641 ));642 EasyMock.verify(admin);643 }644 @Test645 public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {646 mockAdminClient.addTopic(647 false,648 topic1,649 new ArrayList<TopicPartitionInfo>() {650 {651 add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));652 add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.emptyList()));653 }654 },655 null);656 try {657 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());658 internalTopicConfig.setNumberOfPartitions(1);659 internalTopicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig));660 fail("Should have thrown StreamsException");661 } catch (final StreamsException expected) { /* pass */ }662 }663 @Test664 public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {665 mockAdminClient.addTopic(666 false,667 topic1,668 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),669 null);670 // attempt to create it again with replication 1671 final InternalTopicManager internalTopicManager2 = new InternalTopicManager(672 time,673 mockAdminClient,674 new StreamsConfig(config)675 );676 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());677 internalTopicConfig.setNumberOfPartitions(1);678 internalTopicManager2.makeReady(Collections.singletonMap(topic1, internalTopicConfig));679 }680 @Test681 public void shouldNotThrowExceptionForEmptyTopicMap() {682 internalTopicManager.makeReady(Collections.emptyMap());683 }684 @Test685 public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {686 mockAdminClient.timeoutNextRequest(1);687 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());688 internalTopicConfig.setNumberOfPartitions(1);689 try {690 internalTopicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig));691 fail("Should have thrown StreamsException.");692 } catch (final StreamsException expected) {693 assertEquals(TimeoutException.class, expected.getCause().getClass());694 }695 }696 @Test697 public void shouldLogWhenTopicNotFoundAndNotThrowException() {698 mockAdminClient.addTopic(699 false,700 topic1,701 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),702 null);703 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());704 internalTopicConfig.setNumberOfPartitions(1);705 final InternalTopicConfig internalTopicConfigII =706 new RepartitionTopicConfig("internal-topic", Collections.emptyMap());707 internalTopicConfigII.setNumberOfPartitions(1);708 final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();709 topicConfigMap.put(topic1, internalTopicConfig);710 topicConfigMap.put("internal-topic", internalTopicConfigII);711 LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);712 try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {713 internalTopicManager.makeReady(topicConfigMap);714 assertThat(715 appender.getMessages(),716 hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +717 "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")718 );719 }720 }721 @Test722 public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {723 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);724 final InternalTopicManager topicManager = new InternalTopicManager(725 time,726 admin,727 new StreamsConfig(config)728 );729 final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();730 topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));731 final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();732 topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));733 final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();734 topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));735 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))736 .andReturn(new MockDescribeTopicsResult(737 Collections.singletonMap(topic1, topicDescriptionLeaderNotAvailableFuture)))738 .once();739 // we would not need to call create-topics for the first time740 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))741 .andReturn(new MockDescribeTopicsResult(742 Collections.singletonMap(topic1, topicDescriptionUnknownTopicFuture)))743 .once();744 EasyMock.expect(admin.createTopics(Collections.singleton(745 new NewTopic(topic1, Optional.of(1), Optional.of((short) 1))746 .configs(mkMap(mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),747 mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),748 mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),749 mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))750 .andReturn(new MockCreateTopicsResult(Collections.singletonMap(topic1, topicCreationFuture))).once();751 EasyMock.replay(admin);752 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());753 internalTopicConfig.setNumberOfPartitions(1);754 topicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig));755 EasyMock.verify(admin);756 }757 @Test758 public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {759 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);760 final InternalTopicManager topicManager = new InternalTopicManager(761 time,762 admin,763 new StreamsConfig(config)764 );765 final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1,766 Collections.singletonList(broker1), Collections.singletonList(broker1));767 final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();768 topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));769 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessFuture = new KafkaFutureImpl<>();770 topicDescriptionSuccessFuture.complete(771 new TopicDescription(topic1, false, Collections.singletonList(partitionInfo), Collections.emptySet())772 );773 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))774 .andReturn(new MockDescribeTopicsResult(775 Collections.singletonMap(topic1, topicDescriptionFailFuture)))776 .once();777 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))778 .andReturn(new MockDescribeTopicsResult(779 Collections.singletonMap(topic1, topicDescriptionSuccessFuture)))780 .once();781 EasyMock.replay(admin);782 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());783 internalTopicConfig.setNumberOfPartitions(1);784 topicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig));785 EasyMock.verify(admin);786 }787 @Test788 public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {789 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);790 final MockTime time = new MockTime(791 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 15792 );793 final InternalTopicManager topicManager = new InternalTopicManager(794 time,795 admin,796 new StreamsConfig(config)797 );798 final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();799 topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));800 // simulate describeTopics got LeaderNotAvailableException801 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))802 .andReturn(new MockDescribeTopicsResult(803 Collections.singletonMap(topic1, topicDescriptionFailFuture)))804 .anyTimes();805 EasyMock.replay(admin);806 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());807 internalTopicConfig.setNumberOfPartitions(1);808 final TimeoutException exception = assertThrows(809 TimeoutException.class,810 () -> topicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig))811 );812 assertNull(exception.getCause());813 assertThat(814 exception.getMessage(),815 equalTo("Could not create topics within 50 milliseconds." +816 " This can happen if the Kafka cluster is temporarily not available.")817 );818 EasyMock.verify(admin);819 }820 @Test821 public void shouldExhaustRetriesOnMarkedForDeletionTopic() {822 mockAdminClient.addTopic(823 false,824 topic1,825 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),826 null);827 mockAdminClient.markTopicForDeletion(topic1);828 final MockTime time = new MockTime(829 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 15830 );831 final InternalTopicManager internalTopicManager =832 new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));833 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());834 internalTopicConfig.setNumberOfPartitions(1);835 final TimeoutException exception = assertThrows(836 TimeoutException.class,837 () -> internalTopicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig))838 );839 assertNull(exception.getCause());840 assertThat(841 exception.getMessage(),842 equalTo("Could not create topics within 50 milliseconds." +843 " This can happen if the Kafka cluster is temporarily not available.")844 );845 }846 @Test847 public void shouldValidateSuccessfully() {848 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());849 setupTopicInMockAdminClient(topic2, repartitionTopicConfig());850 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);851 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);852 final ValidationResult validationResult = internalTopicManager.validate(mkMap(853 mkEntry(topic1, internalTopicConfig1),854 mkEntry(topic2, internalTopicConfig2)855 ));856 assertThat(validationResult.missingTopics(), empty());857 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());858 }859 @Test860 public void shouldValidateSuccessfullyWithEmptyInternalTopics() {861 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());862 final ValidationResult validationResult = internalTopicManager.validate(Collections.emptyMap());863 assertThat(validationResult.missingTopics(), empty());864 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());865 }866 @Test867 public void shouldReportMissingTopics() {868 final String missingTopic1 = "missingTopic1";869 final String missingTopic2 = "missingTopic2";870 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());871 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);872 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(missingTopic1, 1);873 final InternalTopicConfig internalTopicConfig3 = setupRepartitionTopicConfig(missingTopic2, 1);874 final ValidationResult validationResult = internalTopicManager.validate(mkMap(875 mkEntry(topic1, internalTopicConfig1),876 mkEntry(missingTopic1, internalTopicConfig2),877 mkEntry(missingTopic2, internalTopicConfig3)878 ));879 final Set<String> missingTopics = validationResult.missingTopics();880 assertThat(missingTopics.size(), is(2));881 assertThat(missingTopics, hasItem(missingTopic1));882 assertThat(missingTopics, hasItem(missingTopic2));883 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());884 }885 @Test886 public void shouldReportMisconfigurationsOfPartitionCount() {887 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());888 setupTopicInMockAdminClient(topic2, repartitionTopicConfig());889 setupTopicInMockAdminClient(topic3, repartitionTopicConfig());890 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 2);891 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 3);892 final InternalTopicConfig internalTopicConfig3 = setupRepartitionTopicConfig(topic3, 1);893 final ValidationResult validationResult = internalTopicManager.validate(mkMap(894 mkEntry(topic1, internalTopicConfig1),895 mkEntry(topic2, internalTopicConfig2),896 mkEntry(topic3, internalTopicConfig3)897 ));898 final Map<String, List<String>> misconfigurationsForTopics = validationResult.misconfigurationsForTopics();899 assertThat(validationResult.missingTopics(), empty());900 assertThat(misconfigurationsForTopics.size(), is(2));901 assertThat(misconfigurationsForTopics, hasKey(topic1));902 assertThat(misconfigurationsForTopics.get(topic1).size(), is(1));903 assertThat(904 misconfigurationsForTopics.get(topic1).get(0),905 is("Internal topic " + topic1 + " requires 2 partitions, but the existing topic on the broker has 1 partitions.")906 );907 assertThat(misconfigurationsForTopics, hasKey(topic2));908 assertThat(misconfigurationsForTopics.get(topic2).size(), is(1));909 assertThat(910 misconfigurationsForTopics.get(topic2).get(0),911 is("Internal topic " + topic2 + " requires 3 partitions, but the existing topic on the broker has 1 partitions.")912 );913 assertThat(misconfigurationsForTopics, not(hasKey(topic3)));914 }915 @Test916 public void shouldReportMisconfigurationsOfCleanupPolicyForUnwindowedChangelogTopics() {917 final Map<String, String> unwindowedChangelogConfigWithDeleteCleanupPolicy = unwindowedChangelogConfig();918 unwindowedChangelogConfigWithDeleteCleanupPolicy.put(919 TopicConfig.CLEANUP_POLICY_CONFIG,920 TopicConfig.CLEANUP_POLICY_DELETE921 );922 setupTopicInMockAdminClient(topic1, unwindowedChangelogConfigWithDeleteCleanupPolicy);923 final Map<String, String> unwindowedChangelogConfigWithDeleteCompactCleanupPolicy = unwindowedChangelogConfig();924 unwindowedChangelogConfigWithDeleteCompactCleanupPolicy.put(925 TopicConfig.CLEANUP_POLICY_CONFIG,926 TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE927 );928 setupTopicInMockAdminClient(topic2, unwindowedChangelogConfigWithDeleteCompactCleanupPolicy);929 setupTopicInMockAdminClient(topic3, unwindowedChangelogConfig());930 final InternalTopicConfig internalTopicConfig1 = setupUnwindowedChangelogTopicConfig(topic1, 1);931 final InternalTopicConfig internalTopicConfig2 = setupUnwindowedChangelogTopicConfig(topic2, 1);932 final InternalTopicConfig internalTopicConfig3 = setupUnwindowedChangelogTopicConfig(topic3, 1);933 final ValidationResult validationResult = internalTopicManager.validate(mkMap(934 mkEntry(topic1, internalTopicConfig1),935 mkEntry(topic2, internalTopicConfig2),936 mkEntry(topic3, internalTopicConfig3)937 ));938 final Map<String, List<String>> misconfigurationsForTopics = validationResult.misconfigurationsForTopics();939 assertThat(validationResult.missingTopics(), empty());940 assertThat(misconfigurationsForTopics.size(), is(2));941 assertThat(misconfigurationsForTopics, hasKey(topic1));942 assertThat(misconfigurationsForTopics.get(topic1).size(), is(1));943 assertThat(944 misconfigurationsForTopics.get(topic1).get(0),945 is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic " + topic1 + " should not contain \""946 + TopicConfig.CLEANUP_POLICY_DELETE + "\".")947 );948 assertThat(misconfigurationsForTopics, hasKey(topic2));949 assertThat(misconfigurationsForTopics.get(topic2).size(), is(1));950 assertThat(951 misconfigurationsForTopics.get(topic2).get(0),952 is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic " + topic2 + " should not contain \""953 + TopicConfig.CLEANUP_POLICY_DELETE + "\".")954 );955 assertThat(misconfigurationsForTopics, not(hasKey(topic3)));956 }957 @Test958 public void shouldReportMisconfigurationsOfCleanupPolicyForWindowedChangelogTopics() {959 final long retentionMs = 1000;960 final long shorterRetentionMs = 900;961 setupTopicInMockAdminClient(topic1, windowedChangelogConfig(retentionMs));962 setupTopicInMockAdminClient(topic2, windowedChangelogConfig(shorterRetentionMs));963 final Map<String, String> windowedChangelogConfigOnlyCleanupPolicyCompact = windowedChangelogConfig(retentionMs);964 windowedChangelogConfigOnlyCleanupPolicyCompact.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);965 setupTopicInMockAdminClient(topic3, windowedChangelogConfigOnlyCleanupPolicyCompact);966 final Map<String, String> windowedChangelogConfigOnlyCleanupPolicyDelete = windowedChangelogConfig(shorterRetentionMs);967 windowedChangelogConfigOnlyCleanupPolicyDelete.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);968 setupTopicInMockAdminClient(topic4, windowedChangelogConfigOnlyCleanupPolicyDelete);969 final Map<String, String> windowedChangelogConfigWithRetentionBytes = windowedChangelogConfig(retentionMs);970 windowedChangelogConfigWithRetentionBytes.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024");971 setupTopicInMockAdminClient(topic5, windowedChangelogConfigWithRetentionBytes);972 final InternalTopicConfig internalTopicConfig1 = setupWindowedChangelogTopicConfig(topic1, 1, retentionMs);973 final InternalTopicConfig internalTopicConfig2 = setupWindowedChangelogTopicConfig(topic2, 1, retentionMs);974 final InternalTopicConfig internalTopicConfig3 = setupWindowedChangelogTopicConfig(topic3, 1, retentionMs);975 final InternalTopicConfig internalTopicConfig4 = setupWindowedChangelogTopicConfig(topic4, 1, retentionMs);976 final InternalTopicConfig internalTopicConfig5 = setupWindowedChangelogTopicConfig(topic5, 1, retentionMs);977 final ValidationResult validationResult = internalTopicManager.validate(mkMap(978 mkEntry(topic1, internalTopicConfig1),979 mkEntry(topic2, internalTopicConfig2),980 mkEntry(topic3, internalTopicConfig3),981 mkEntry(topic4, internalTopicConfig4),982 mkEntry(topic5, internalTopicConfig5)983 ));984 final Map<String, List<String>> misconfigurationsForTopics = validationResult.misconfigurationsForTopics();985 assertThat(validationResult.missingTopics(), empty());986 assertThat(misconfigurationsForTopics.size(), is(3));987 assertThat(misconfigurationsForTopics, hasKey(topic2));988 assertThat(misconfigurationsForTopics.get(topic2).size(), is(1));989 assertThat(990 misconfigurationsForTopics.get(topic2).get(0),991 is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic " +992 topic2 + " is " + shorterRetentionMs + " but should be " + retentionMs + " or larger.")993 );994 assertThat(misconfigurationsForTopics, hasKey(topic4));995 assertThat(misconfigurationsForTopics.get(topic4).size(), is(1));996 assertThat(997 misconfigurationsForTopics.get(topic4).get(0),998 is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic " +999 topic4 + " is " + shorterRetentionMs + " but should be " + retentionMs + " or larger.")1000 );1001 assertThat(misconfigurationsForTopics, hasKey(topic5));1002 assertThat(misconfigurationsForTopics.get(topic5).size(), is(1));1003 assertThat(1004 misconfigurationsForTopics.get(topic5).get(0),1005 is("Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic " +1006 topic5 + " is set but it should be unset.")1007 );1008 assertThat(misconfigurationsForTopics, not(hasKey(topic1)));1009 assertThat(misconfigurationsForTopics, not(hasKey(topic3)));1010 }1011 @Test1012 public void shouldReportMisconfigurationsOfCleanupPolicyForRepartitionTopics() {1013 final long retentionMs = 1000;1014 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());1015 final Map<String, String> repartitionTopicConfigCleanupPolicyCompact = repartitionTopicConfig();1016 repartitionTopicConfigCleanupPolicyCompact.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);1017 setupTopicInMockAdminClient(topic2, repartitionTopicConfigCleanupPolicyCompact);1018 final Map<String, String> repartitionTopicConfigCleanupPolicyCompactAndDelete = repartitionTopicConfig();1019 repartitionTopicConfigCleanupPolicyCompactAndDelete.put(1020 TopicConfig.CLEANUP_POLICY_CONFIG,1021 TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE1022 );1023 setupTopicInMockAdminClient(topic3, repartitionTopicConfigCleanupPolicyCompactAndDelete);1024 final Map<String, String> repartitionTopicConfigWithFiniteRetentionMs = repartitionTopicConfig();1025 repartitionTopicConfigWithFiniteRetentionMs.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs));1026 setupTopicInMockAdminClient(topic4, repartitionTopicConfigWithFiniteRetentionMs);1027 final Map<String, String> repartitionTopicConfigWithRetentionBytesSet = repartitionTopicConfig();1028 repartitionTopicConfigWithRetentionBytesSet.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024");1029 setupTopicInMockAdminClient(topic5, repartitionTopicConfigWithRetentionBytesSet);1030 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);1031 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);1032 final InternalTopicConfig internalTopicConfig3 = setupRepartitionTopicConfig(topic3, 1);1033 final InternalTopicConfig internalTopicConfig4 = setupRepartitionTopicConfig(topic4, 1);1034 final InternalTopicConfig internalTopicConfig5 = setupRepartitionTopicConfig(topic5, 1);1035 final ValidationResult validationResult = internalTopicManager.validate(mkMap(1036 mkEntry(topic1, internalTopicConfig1),1037 mkEntry(topic2, internalTopicConfig2),1038 mkEntry(topic3, internalTopicConfig3),1039 mkEntry(topic4, internalTopicConfig4),1040 mkEntry(topic5, internalTopicConfig5)1041 ));1042 final Map<String, List<String>> misconfigurationsForTopics = validationResult.misconfigurationsForTopics();1043 assertThat(validationResult.missingTopics(), empty());1044 assertThat(misconfigurationsForTopics.size(), is(4));1045 assertThat(misconfigurationsForTopics, hasKey(topic2));1046 assertThat(misconfigurationsForTopics.get(topic2).size(), is(1));1047 assertThat(1048 misconfigurationsForTopics.get(topic2).get(0),1049 is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic "1050 + topic2 + " should not contain \"" + TopicConfig.CLEANUP_POLICY_COMPACT + "\".")1051 );1052 assertThat(misconfigurationsForTopics, hasKey(topic3));1053 assertThat(misconfigurationsForTopics.get(topic3).size(), is(1));1054 assertThat(1055 misconfigurationsForTopics.get(topic3).get(0),1056 is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic "1057 + topic3 + " should not contain \"" + TopicConfig.CLEANUP_POLICY_COMPACT + "\".")1058 );1059 assertThat(misconfigurationsForTopics, hasKey(topic4));1060 assertThat(misconfigurationsForTopics.get(topic4).size(), is(1));1061 assertThat(1062 misconfigurationsForTopics.get(topic4).get(0),1063 is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic "1064 + topic4 + " is " + retentionMs + " but should be -1.")1065 );1066 assertThat(misconfigurationsForTopics, hasKey(topic5));1067 assertThat(misconfigurationsForTopics.get(topic5).size(), is(1));1068 assertThat(1069 misconfigurationsForTopics.get(topic5).get(0),1070 is("Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic "1071 + topic5 + " is set but it should be unset.")1072 );1073 }1074 @Test1075 public void shouldReportMultipleMisconfigurationsForSameTopic() {1076 final long retentionMs = 1000;1077 final long shorterRetentionMs = 900;1078 final Map<String, String> windowedChangelogConfig = windowedChangelogConfig(shorterRetentionMs);1079 windowedChangelogConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024");1080 setupTopicInMockAdminClient(topic1, windowedChangelogConfig);1081 final InternalTopicConfig internalTopicConfig1 = setupWindowedChangelogTopicConfig(topic1, 1, retentionMs);1082 final ValidationResult validationResult = internalTopicManager.validate(mkMap(1083 mkEntry(topic1, internalTopicConfig1)1084 ));1085 final Map<String, List<String>> misconfigurationsForTopics = validationResult.misconfigurationsForTopics();1086 assertThat(validationResult.missingTopics(), empty());1087 assertThat(misconfigurationsForTopics.size(), is(1));1088 assertThat(misconfigurationsForTopics, hasKey(topic1));1089 assertThat(misconfigurationsForTopics.get(topic1).size(), is(2));1090 assertThat(1091 misconfigurationsForTopics.get(topic1).get(0),1092 is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic " +1093 topic1 + " is " + shorterRetentionMs + " but should be " + retentionMs + " or larger.")1094 );1095 assertThat(1096 misconfigurationsForTopics.get(topic1).get(1),1097 is("Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic " +1098 topic1 + " is set but it should be unset.")1099 );1100 }1101 @Test1102 public void shouldThrowWhenPartitionCountUnknown() {1103 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());1104 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());1105 assertThrows(1106 IllegalStateException.class,1107 () -> internalTopicManager.validate(Collections.singletonMap(topic1, internalTopicConfig))1108 );1109 }1110 @Test1111 public void shouldNotThrowExceptionIfTopicExistsWithDifferentReplication() {1112 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());1113 // attempt to create it again with replication 11114 final InternalTopicManager internalTopicManager2 = new InternalTopicManager(1115 time,1116 mockAdminClient,1117 new StreamsConfig(config)1118 );1119 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1120 final ValidationResult validationResult =1121 internalTopicManager2.validate(Collections.singletonMap(topic1, internalTopicConfig));1122 assertThat(validationResult.missingTopics(), empty());1123 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());1124 }1125 @Test1126 public void shouldRetryWhenCallsThrowTimeoutExceptionDuringValidation() {1127 setupTopicInMockAdminClient(topic1, repartitionTopicConfig());1128 mockAdminClient.timeoutNextRequest(2);1129 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1130 final ValidationResult validationResult = internalTopicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));1131 assertThat(validationResult.missingTopics(), empty());1132 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());1133 }1134 @Test1135 public void shouldOnlyRetryDescribeTopicsWhenDescribeTopicsThrowsLeaderNotAvailableExceptionDuringValidation() {1136 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1137 final InternalTopicManager topicManager = new InternalTopicManager(1138 time,1139 admin,1140 new StreamsConfig(config)1141 );1142 final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();1143 topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));1144 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessfulFuture = new KafkaFutureImpl<>();1145 topicDescriptionSuccessfulFuture.complete(new TopicDescription(1146 topic1,1147 false,1148 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList()))1149 ));1150 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1151 .andReturn(new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture))))1152 .andReturn(new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionSuccessfulFuture))));1153 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1154 topicConfigSuccessfulFuture.complete(1155 new Config(repartitionTopicConfig().entrySet().stream()1156 .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))1157 );1158 final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);1159 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))1160 .andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));1161 EasyMock.replay(admin);1162 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1163 final ValidationResult validationResult = topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));1164 assertThat(validationResult.missingTopics(), empty());1165 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());1166 EasyMock.verify(admin);1167 }1168 @Test1169 public void shouldOnlyRetryDescribeConfigsWhenDescribeConfigsThrowsLeaderNotAvailableExceptionDuringValidation() {1170 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1171 final InternalTopicManager topicManager = new InternalTopicManager(1172 time,1173 admin,1174 new StreamsConfig(config)1175 );1176 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessfulFuture = new KafkaFutureImpl<>();1177 topicDescriptionSuccessfulFuture.complete(new TopicDescription(1178 topic1,1179 false,1180 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList()))1181 ));1182 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1183 .andReturn(new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionSuccessfulFuture))));1184 final KafkaFutureImpl<Config> topicConfigsFailFuture = new KafkaFutureImpl<>();1185 topicConfigsFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));1186 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1187 topicConfigSuccessfulFuture.complete(1188 new Config(repartitionTopicConfig().entrySet().stream()1189 .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))1190 );1191 final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);1192 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))1193 .andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigsFailFuture))))1194 .andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));1195 EasyMock.replay(admin);1196 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1197 final ValidationResult validationResult = topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));1198 assertThat(validationResult.missingTopics(), empty());1199 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());1200 EasyMock.verify(admin);1201 }1202 @Test1203 public void shouldOnlyRetryNotSuccessfulFuturesDuringValidation() {1204 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1205 final InternalTopicManager topicManager = new InternalTopicManager(1206 time,1207 admin,1208 new StreamsConfig(config)1209 );1210 final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();1211 topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));1212 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessfulFuture1 = new KafkaFutureImpl<>();1213 topicDescriptionSuccessfulFuture1.complete(new TopicDescription(1214 topic1,1215 false,1216 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList()))1217 ));1218 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessfulFuture2 = new KafkaFutureImpl<>();1219 topicDescriptionSuccessfulFuture2.complete(new TopicDescription(1220 topic2,1221 false,1222 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList()))1223 ));1224 EasyMock.expect(admin.describeTopics(mkSet(topic1, topic2)))1225 .andAnswer(() -> new MockDescribeTopicsResult(mkMap(1226 mkEntry(topic1, topicDescriptionSuccessfulFuture1),1227 mkEntry(topic2, topicDescriptionFailFuture)1228 )));1229 EasyMock.expect(admin.describeTopics(mkSet(topic2)))1230 .andAnswer(() -> new MockDescribeTopicsResult(mkMap(1231 mkEntry(topic2, topicDescriptionSuccessfulFuture2)1232 )));1233 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1234 topicConfigSuccessfulFuture.complete(1235 new Config(repartitionTopicConfig().entrySet().stream()1236 .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))1237 );1238 final ConfigResource topicResource1 = new ConfigResource(Type.TOPIC, topic1);1239 final ConfigResource topicResource2 = new ConfigResource(Type.TOPIC, topic2);1240 EasyMock.expect(admin.describeConfigs(mkSet(topicResource1, topicResource2)))1241 .andAnswer(() -> new MockDescribeConfigsResult(mkMap(1242 mkEntry(topicResource1, topicConfigSuccessfulFuture),1243 mkEntry(topicResource2, topicConfigSuccessfulFuture)1244 )));1245 EasyMock.replay(admin);1246 final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);1247 final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);1248 final ValidationResult validationResult = topicManager.validate(mkMap(1249 mkEntry(topic1, internalTopicConfig1),1250 mkEntry(topic2, internalTopicConfig2)1251 ));1252 assertThat(validationResult.missingTopics(), empty());1253 assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());1254 EasyMock.verify(admin);1255 }1256 @Test1257 public void shouldThrowWhenDescribeTopicsThrowsUnexpectedExceptionDuringValidation() {1258 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1259 final InternalTopicManager topicManager = new InternalTopicManager(1260 time,1261 admin,1262 new StreamsConfig(config)1263 );1264 final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();1265 topicDescriptionFailFuture.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));1266 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1267 .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture))));1268 EasyMock.replay(admin);1269 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1270 assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)));1271 }1272 @Test1273 public void shouldThrowWhenDescribeConfigsThrowsUnexpectedExceptionDuringValidation() {1274 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1275 final InternalTopicManager topicManager = new InternalTopicManager(1276 time,1277 admin,1278 new StreamsConfig(config)1279 );1280 final KafkaFutureImpl<Config> configDescriptionFailFuture = new KafkaFutureImpl<>();1281 configDescriptionFailFuture.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));1282 final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);1283 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))1284 .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, configDescriptionFailFuture))));1285 EasyMock.replay(admin);1286 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1287 assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)));1288 }1289 @Test1290 public void shouldThrowWhenTopicDescriptionsDoNotContainTopicDuringValidation() {1291 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1292 final InternalTopicManager topicManager = new InternalTopicManager(1293 time,1294 admin,1295 new StreamsConfig(config)1296 );1297 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessfulFuture = new KafkaFutureImpl<>();1298 topicDescriptionSuccessfulFuture.complete(new TopicDescription(1299 topic1,1300 false,1301 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList()))1302 ));1303 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1304 .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic2, topicDescriptionSuccessfulFuture))));1305 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1306 topicConfigSuccessfulFuture.complete(new Config(Collections.emptySet()));1307 final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);1308 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))1309 .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));1310 EasyMock.replay(admin);1311 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1312 assertThrows(1313 IllegalStateException.class,1314 () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig))1315 );1316 }1317 @Test1318 public void shouldThrowWhenConfigDescriptionsDoNotContainTopicDuringValidation() {1319 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1320 final InternalTopicManager topicManager = new InternalTopicManager(1321 time,1322 admin,1323 new StreamsConfig(config)1324 );1325 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessfulFuture = new KafkaFutureImpl<>();1326 topicDescriptionSuccessfulFuture.complete(new TopicDescription(1327 topic1,1328 false,1329 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList()))1330 ));1331 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1332 .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionSuccessfulFuture))));1333 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1334 topicConfigSuccessfulFuture.complete(new Config(Collections.emptySet()));1335 final ConfigResource topicResource1 = new ConfigResource(Type.TOPIC, topic1);1336 final ConfigResource topicResource2 = new ConfigResource(Type.TOPIC, topic2);1337 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource1)))1338 .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource2, topicConfigSuccessfulFuture))));1339 EasyMock.replay(admin);1340 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1341 assertThrows(1342 IllegalStateException.class,1343 () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig))1344 );1345 }1346 @Test1347 public void shouldThrowWhenConfigDescriptionsDoNotCleanupPolicyForUnwindowedConfigDuringValidation() {1348 shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(1349 setupUnwindowedChangelogTopicConfig(topic1, 1),1350 configWithoutKey(unwindowedChangelogConfig(), TopicConfig.CLEANUP_POLICY_CONFIG)1351 );1352 }1353 @Test1354 public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForWindowedConfigDuringValidation() {1355 final long retentionMs = 1000;1356 shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(1357 setupWindowedChangelogTopicConfig(topic1, 1, retentionMs),1358 configWithoutKey(windowedChangelogConfig(retentionMs), TopicConfig.CLEANUP_POLICY_CONFIG)1359 );1360 }1361 @Test1362 public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForWindowedConfigDuringValidation() {1363 final long retentionMs = 1000;1364 shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(1365 setupWindowedChangelogTopicConfig(topic1, 1, retentionMs),1366 configWithoutKey(windowedChangelogConfig(retentionMs), TopicConfig.RETENTION_MS_CONFIG)1367 );1368 }1369 @Test1370 public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForWindowedConfigDuringValidation() {1371 final long retentionMs = 1000;1372 shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(1373 setupWindowedChangelogTopicConfig(topic1, 1, retentionMs),1374 configWithoutKey(windowedChangelogConfig(retentionMs), TopicConfig.RETENTION_BYTES_CONFIG)1375 );1376 }1377 @Test1378 public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForRepartitionConfigDuringValidation() {1379 shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(1380 setupRepartitionTopicConfig(topic1, 1),1381 configWithoutKey(repartitionTopicConfig(), TopicConfig.CLEANUP_POLICY_CONFIG)1382 );1383 }1384 @Test1385 public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForRepartitionConfigDuringValidation() {1386 shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(1387 setupRepartitionTopicConfig(topic1, 1),1388 configWithoutKey(repartitionTopicConfig(), TopicConfig.RETENTION_MS_CONFIG)1389 );1390 }1391 @Test1392 public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForRepartitionConfigDuringValidation() {1393 shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(1394 setupRepartitionTopicConfig(topic1, 1),1395 configWithoutKey(repartitionTopicConfig(), TopicConfig.RETENTION_BYTES_CONFIG)1396 );1397 }1398 private Config configWithoutKey(final Map<String, String> config, final String key) {1399 return new Config(config.entrySet().stream()1400 .filter(entry -> !entry.getKey().equals(key))1401 .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet())1402 );1403 }1404 private void shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(final InternalTopicConfig streamsSideTopicConfig,1405 final Config brokerSideTopicConfig) {1406 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1407 final InternalTopicManager topicManager = new InternalTopicManager(1408 time,1409 admin,1410 new StreamsConfig(config)1411 );1412 final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessfulFuture = new KafkaFutureImpl<>();1413 topicDescriptionSuccessfulFuture.complete(new TopicDescription(1414 topic1,1415 false,1416 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList()))1417 ));1418 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1419 .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionSuccessfulFuture))));1420 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1421 topicConfigSuccessfulFuture.complete(brokerSideTopicConfig);1422 final ConfigResource topicResource1 = new ConfigResource(Type.TOPIC, topic1);1423 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource1)))1424 .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource1, topicConfigSuccessfulFuture))));1425 EasyMock.replay(admin);1426 assertThrows(1427 IllegalStateException.class,1428 () -> topicManager.validate(Collections.singletonMap(topic1, streamsSideTopicConfig))1429 );1430 }1431 @Test1432 public void shouldThrowTimeoutExceptionWhenTimeoutIsExceededDuringValidation() {1433 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1434 final MockTime time = new MockTime(1435 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 31436 );1437 final InternalTopicManager topicManager = new InternalTopicManager(1438 time,1439 admin,1440 new StreamsConfig(config)1441 );1442 final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();1443 topicDescriptionFailFuture.completeExceptionally(new TimeoutException());1444 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1445 .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture))));1446 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1447 topicConfigSuccessfulFuture.complete(1448 new Config(repartitionTopicConfig().entrySet().stream()1449 .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))1450 );1451 final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);1452 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))1453 .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));1454 EasyMock.replay(admin);1455 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1456 assertThrows(1457 TimeoutException.class,1458 () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig))1459 );1460 }1461 @Test1462 public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringValidation() {1463 final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);1464 final MockTime time = new MockTime(1465 (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 31466 );1467 final InternalTopicManager topicManager = new InternalTopicManager(1468 time,1469 admin,1470 new StreamsConfig(config)1471 );1472 final KafkaFutureImpl<TopicDescription> topicDescriptionFutureThatNeverCompletes = new KafkaFutureImpl<>();1473 EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))1474 .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFutureThatNeverCompletes))));1475 final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();1476 topicConfigSuccessfulFuture.complete(1477 new Config(repartitionTopicConfig().entrySet().stream()1478 .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))1479 );1480 final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);1481 EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))1482 .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));1483 EasyMock.replay(admin);1484 final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);1485 assertThrows(1486 TimeoutException.class,1487 () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig))1488 );1489 }1490 private NewTopic newTopic(final String topicName,1491 final InternalTopicConfig topicConfig,1492 final StreamsConfig streamsConfig) {1493 return new NewTopic(1494 topicName,1495 topicConfig.numberOfPartitions(),1496 Optional.of(streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue())1497 ).configs(topicConfig.getProperties(1498 Collections.emptyMap(),1499 streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG))1500 );1501 }1502 private Map<String, String> repartitionTopicConfig() {1503 return mkMap(1504 mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),1505 mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"),1506 mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)1507 );1508 }1509 private Map<String, String> unwindowedChangelogConfig() {1510 return mkMap(1511 mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)1512 );1513 }1514 private Map<String, String> windowedChangelogConfig(final long retentionMs) {1515 return mkMap(1516 mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE),1517 mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)),1518 mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)1519 );1520 }1521 private void setupTopicInMockAdminClient(final String topic, final Map<String, String> topicConfig) {1522 mockAdminClient.addTopic(1523 false,1524 topic,1525 Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),1526 topicConfig1527 );1528 }1529 private InternalTopicConfig setupUnwindowedChangelogTopicConfig(final String topicName,1530 final int partitionCount) {1531 final InternalTopicConfig internalTopicConfig =1532 new UnwindowedChangelogTopicConfig(topicName, Collections.emptyMap());1533 internalTopicConfig.setNumberOfPartitions(partitionCount);1534 return internalTopicConfig;1535 }1536 private InternalTopicConfig setupWindowedChangelogTopicConfig(final String topicName,1537 final int partitionCount,1538 final long retentionMs) {1539 final InternalTopicConfig internalTopicConfig = new WindowedChangelogTopicConfig(1540 topicName,1541 mkMap(mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)))1542 );1543 internalTopicConfig.setNumberOfPartitions(partitionCount);1544 return internalTopicConfig;1545 }1546 private InternalTopicConfig setupRepartitionTopicConfig(final String topicName,1547 final int partitionCount) {1548 final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topicName, Collections.emptyMap());1549 internalTopicConfig.setNumberOfPartitions(partitionCount);1550 return internalTopicConfig;1551 }1552 private static class MockCreateTopicsResult extends CreateTopicsResult {1553 MockCreateTopicsResult(final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) {1554 super(futures);1555 }1556 }1557 private static class MockDeleteTopicsResult extends DeleteTopicsResult {1558 MockDeleteTopicsResult(final Map<String, KafkaFuture<Void>> futures) {1559 super(null, futures);1560 }1561 }1562 private static class MockDescribeTopicsResult extends DescribeTopicsResult {1563 MockDescribeTopicsResult(final Map<String, KafkaFuture<TopicDescription>> futures) {1564 super(null, futures);1565 }1566 }1567 private static class MockDescribeConfigsResult extends DescribeConfigsResult {1568 MockDescribeConfigsResult(final Map<ConfigResource, KafkaFuture<Config>> futures) {1569 super(futures);1570 }1571 }1572}...

Full Screen

Full Screen

Source:MockEditingDomainFactory.java Github

copy

Full Screen

...49 reset(result);50 result.execute(isA(Command.class));51 expectLastCall().andAnswer(new IAnswer<Object>() {52 @Override53 public Object answer() throws Throwable {54 Object[] args = getCurrentArguments();55 assert args.length == 1;56 assert args[0] instanceof Command;57 Command cmd = (Command) args[0];58 if (cmd instanceof RecordingCommand) {59 RecordingCommand rc = (RecordingCommand) cmd;60 rc.execute();61 }62 return cmd;63 }64 }).anyTimes();65 replay(result);66 return result;67 }...

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1import org.easymock.internal.Result;2import org.easymock.internal.MocksControl;3{4 public static void main(String[] args)5 {6 Result result = new Result();7 MocksControl control = new MocksControl();8 control.setDefaultResult(result);9 control.setDefaultMatcher(null);10 control.setDefaultState(null);11 control.setDefaultThrowable(null);12 control.setDefaultObject(null);13 result.answer();14 }15}16import org.easymock.internal.Result;17import org.easymock.internal.MocksControl;18{19 public static void main(String[] args)20 {21 Result result = new Result();22 MocksControl control = new MocksControl();23 control.setDefaultResult(result);24 control.setDefaultMatcher(null);25 control.setDefaultState(null);26 control.setDefaultThrowable(null);27 control.setDefaultObject(null);28 control.answer();29 }30}31import org.easymock.internal.Result;32import org.easymock.internal.MocksControl;33{34 public static void main(String[] args)35 {36 Result result = new Result();37 MocksControl control = new MocksControl();38 control.setDefaultResult(result);39 control.setDefaultMatcher(null);40 control.setDefaultState(null);41 control.setDefaultThrowable(null);42 control.setDefaultObject(null);43 control.answer(null);44 }45}46import org.easymock.internal.Result;47import org.easymock.internal.MocksControl;48{49 public static void main(String[] args)50 {51 Result result = new Result();52 MocksControl control = new MocksControl();53 control.setDefaultResult(result);54 control.setDefaultMatcher(null);55 control.setDefaultState(null);56 control.setDefaultThrowable(null);57 control.setDefaultObject(null);58 control.answer(null, null);59 }60}61import org.easymock.internal.Result;62import org.easymock.internal.MocksControl;63{64 public static void main(String[] args)

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1package org.easymock.internal;2import java.lang.reflect.Method;3public class 1 {4 public static void main(String[] args) throws Exception {5 Class<?> clazz = Class.forName("org.easymock.internal.Result");6 Method method = clazz.getDeclaredMethod("answer", Object.class);7 method.setAccessible(true);8 method.invoke(new Result(), new Object());9 }10}11package org.easymock.internal;12import java.lang.reflect.Method;13public class 2 {14 public static void main(String[] args) throws Exception {15 Class<?> clazz = Class.forName("org.easymock.internal.Result");16 Method method = clazz.getDeclaredMethod("answer", Object.class);17 method.setAccessible(true);18 method.invoke(new Result(), new Object());19 }20}21package org.easymock.internal;22import java.lang.reflect.Method;23public class 3 {24 public static void main(String[] args) throws Exception {25 Class<?> clazz = Class.forName("org.easymock.internal.Result");26 Method method = clazz.getDeclaredMethod("answer", Object.class);27 method.setAccessible(true);28 method.invoke(new Result(), new Object());29 }30}31package org.easymock.internal;32import java.lang.reflect.Method;33public class 4 {34 public static void main(String[] args) throws Exception {35 Class<?> clazz = Class.forName("org.easymock.internal.Result");36 Method method = clazz.getDeclaredMethod("answer", Object.class);37 method.setAccessible(true);38 method.invoke(new Result(), new Object());39 }40}41package org.easymock.internal;42import java.lang.reflect.Method;43public class 5 {44 public static void main(String[] args) throws Exception {45 Class<?> clazz = Class.forName("org.easymock.internal.Result");46 Method method = clazz.getDeclaredMethod("answer", Object.class);47 method.setAccessible(true);48 method.invoke(new Result(), new Object());49 }50}

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1public class Test {2 public static void main(String[] args) {3 Result result = new Result();4 System.out.println(result.answer());5 }6}7public class Test {8 public static void main(String[] args) {9 org.easymock.internal.Result result = new org.easymock.internal.Result();10 System.out.println(result.answer());11 }12}13public class Test {14 public static void main(String[] args) {15 org.easymock.internal.Result result = new org.easymock.internal.Result();16 System.out.println(result.answer());17 }18}19public class Test {20 public static void main(String[] args) {21 org.easymock.internal.Result result = new org.easymock.internal.Result();22 System.out.println(result.answer());23 }24}25public class Test {26 public static void main(String[] args) {27 org.easymock.internal.Result result = new org.easymock.internal.Result();28 System.out.println(result.answer());29 }30}31public class Test {32 public static void main(String[] args) {33 org.easymock.internal.Result result = new org.easymock.internal.Result();34 System.out.println(result.answer());35 }36}37public class Test {38 public static void main(String[] args) {39 org.easymock.internal.Result result = new org.easymock.internal.Result();40 System.out.println(result.answer());41 }42}43public class Test {44 public static void main(String[] args) {45 org.easymock.internal.Result result = new org.easymock.internal.Result();46 System.out.println(result.answer());47 }48}

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1package org.easymock.internal;2public class Result {3 public Object answer(Invocation invocation) {4 return null;5 }6}7package org.easymock.internal;8public class Result {9 public Object answer(Invocation invocation) {10 return null;11 }12}13package org.easymock.internal;14public class Result {15 public Object answer(Invocation invocation) {16 return null;17 }18}19package org.easymock.internal;20public class Result {21 public Object answer(Invocation invocation) {22 return null;23 }24}25package org.easymock.internal;26public class Result {27 public Object answer(Invocation invocation) {28 return null;29 }30}31package org.easymock.internal;32public class Result {33 public Object answer(Invocation invocation) {34 return null;35 }36}37package org.easymock.internal;38public class Result {39 public Object answer(Invocation invocation) {40 return null;41 }42}43package org.easymock.internal;44public class Result {45 public Object answer(Invocation invocation) {46 return null;47 }48}49package org.easymock.internal;50public class Result {51 public Object answer(Invocation invocation) {52 return null;53 }54}55package org.easymock.internal;56public class Result {57 public Object answer(Invocation invocation) {58 return null;59 }60}

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1public class Test {2 public static void main(String[] args) {3 Result result = new Result();4 result.answer(null);5 }6}7public class Test {8 public static void main(String[] args) {9 MocksControl control = new MocksControl(null, null);10 control.answer(null);11 }12}13public class Test {14 public static void main(String[] args) {15 MocksControl control = new MocksControl(null, null);16 control.answer(null, null);17 }18}19public class Test {20 public static void main(String[] args) {21 MocksControl control = new MocksControl(null, null);22 control.answer(null, null, null);23 }24}25public class Test {26 public static void main(String[] args) {27 MocksControl control = new MocksControl(null, null);28 control.answer(null, null, null, null);29 }30}31public class Test {32 public static void main(String[] args) {33 MocksControl control = new MocksControl(null, null);34 control.answer(null, null, null, null, null);35 }36}37public class Test {38 public static void main(String[] args) {39 MocksControl control = new MocksControl(null, null);40 control.answer(null, null, null, null, null, null);41 }42}43public class Test {44 public static void main(String[] args) {45 MocksControl control = new MocksControl(null, null);46 control.answer(null, null, null, null, null, null, null);47 }48}

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1public class 1 {2 public static void main(String[] args) {3 Result result = EasyMock.createMock(Result.class);4 EasyMock.expect(result.answer()).andReturn("EasyMock is easy to use");5 EasyMock.replay(result);6 System.out.println(result.answer());7 EasyMock.verify(result);8 }9}10public class 2 {11 public static void main(String[] args) {12 Result result = EasyMock.createMock(Result.class);13 EasyMock.expect(result.answer()).andAnswer(new IAnswer<String>() {14 public String answer() throws Throwable {15 return "EasyMock is easy to use";16 }17 });18 EasyMock.replay(result);19 System.out.println(result.answer());20 EasyMock.verify(result);21 }22}23public class 3 {24 public static void main(String[] args) {25 Result result = EasyMock.createMock(Result.class);26 EasyMock.expect(result.answer()).andStubAnswer(new IAnswer<String>() {27 public String answer() throws Throwable {28 return "EasyMock is easy to use";29 }30 });31 EasyMock.replay(result);32 System.out.println(result.answer());33 EasyMock.verify(result);34 }35}36public class 4 {37 public static void main(String[] args) {38 Result result = EasyMock.createMock(Result.class);39 EasyMock.expect(result.answer()).andStubReturn("EasyMock is easy to use");40 EasyMock.replay(result);

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1public class 1 {2 public static void main(String[] args) {3 MockControl mockControl = MockControl.createControl(1.class);4 1 mock1 = (1) mockControl.getMock();5 mock1.method1();6 mockControl.setReturnValue("answer");7 mockControl.replay();8 mock1.method1();9 System.out.println(mockControl.getMock().toString());10 mockControl.verify();11 }12}13public class 2 {14 public static void main(String[] args) {15 MockControl mockControl = MockControl.createControl(2.class);16 2 mock2 = (2) mockControl.getMock();17 mock2.method2();18 mockControl.setReturnValue("answer");19 mockControl.replay();20 mock2.method2();21 System.out.println(mockControl.getMock().toString());22 mockControl.verify();23 }24}25public class 3 {26 public static void main(String[] args) {27 MockControl mockControl = MockControl.createControl(3.class);28 3 mock3 = (3) mockControl.getMock();29 mock3.method3();30 mockControl.setReturnValue("answer");31 mockControl.replay();32 mock3.method3();33 System.out.println(mockControl.getMock().toString());34 mockControl.verify();35 }36}37public class 4 {38 public static void main(String[] args) {39 MockControl mockControl = MockControl.createControl(4.class);40 4 mock4 = (4) mockControl.getMock();41 mock4.method4();42 mockControl.setReturnValue("answer");43 mockControl.replay();44 mock4.method4();45 System.out.println(mockControl.getMock().toString());46 mockControl.verify();47 }48}49public class 5 {

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1public class 1 {2 public static void main(String[] args) {3 Result r = new Result();4 r.answer();5 }6}7public class 2 {8 public static void main(String[] args) {9 Result r = new Result();10 r.answer();11 }12}13public class 3 {14 public static void main(String[] args) {15 Result r = new Result();16 r.answer();17 }18}19public class 4 {20 public static void main(String[] args) {21 Result r = new Result();22 r.answer();23 }24}25public class 5 {26 public static void main(String[] args) {27 Result r = new Result();28 r.answer();29 }30}31public class 6 {32 public static void main(String[] args) {33 Result r = new Result();34 r.answer();35 }36}37public class 7 {38 public static void main(String[] args) {39 Result r = new Result();40 r.answer();41 }42}43public class 8 {44 public static void main(String[] args) {45 Result r = new Result();

Full Screen

Full Screen

answer

Using AI Code Generation

copy

Full Screen

1import org.easymock.EasyMock;2import org.easymock.internal.Result;3public class 1 {4 public static void main(String[] args) throws Exception {5 Object mock = EasyMock.createMock(Object.class);6 Result result = new Result();7 result.answer(mock);8 Object answer = result.getAnswer();9 System.out.println(answer);10 }11}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Easymock automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful