Best Powermock code snippet using org.powermock.api.easymock.PowerMock
Source:DistributedHerderTest.java  
...42import org.easymock.IAnswer;43import org.junit.Before;44import org.junit.Test;45import org.junit.runner.RunWith;46import org.powermock.api.easymock.PowerMock;47import org.powermock.api.easymock.annotation.Mock;48import org.powermock.core.classloader.annotations.PowerMockIgnore;49import org.powermock.core.classloader.annotations.PrepareForTest;50import org.powermock.modules.junit4.PowerMockRunner;51import java.util.ArrayList;52import java.util.Arrays;53import java.util.Collection;54import java.util.Collections;55import java.util.HashMap;56import java.util.List;57import java.util.Map;58import java.util.concurrent.ExecutionException;59import java.util.concurrent.TimeUnit;60import java.util.concurrent.TimeoutException;61import static org.junit.Assert.assertEquals;62import static org.junit.Assert.assertTrue;63import static org.junit.Assert.fail;64@RunWith(PowerMockRunner.class)65@PrepareForTest(DistributedHerder.class)66@PowerMockIgnore("javax.management.*")67public class DistributedHerderTest {68    private static final Map<String, String> HERDER_CONFIG = new HashMap<>();69    static {70        HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");71        HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");72        HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");73        HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");74        // The WorkerConfig base class has some required settings without defaults75        HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");76        HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");77        HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");78        HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");79        HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");80        HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");81        HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");82    }83    private static final String MEMBER_URL = "memberUrl";84    private static final String CONN1 = "sourceA";85    private static final String CONN2 = "sourceB";86    private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);87    private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);88    private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);89    private static final Integer MAX_TASKS = 3;90    private static final Map<String, String> CONN1_CONFIG = new HashMap<>();91    static {92        CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);93        CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());94        CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");95        CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());96    }97    private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);98    static {99        CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");100    }101    private static final Map<String, String> CONN2_CONFIG = new HashMap<>();102    static {103        CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);104        CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());105        CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");106        CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());107    }108    private static final Map<String, String> TASK_CONFIG = new HashMap<>();109    static {110        TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName());111    }112    private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();113    static {114        TASK_CONFIGS.add(TASK_CONFIG);115        TASK_CONFIGS.add(TASK_CONFIG);116        TASK_CONFIGS.add(TASK_CONFIG);117    }118    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP = new HashMap<>();119    static {120        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);121        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);122        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);123    }124    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),125            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),126            TASK_CONFIGS_MAP, Collections.<String>emptySet());127    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),128            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.PAUSED),129            TASK_CONFIGS_MAP, Collections.<String>emptySet());130    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),131            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED),132            TASK_CONFIGS_MAP, Collections.<String>emptySet());133    private static final String WORKER_ID = "localhost:8083";134    @Mock private KafkaConfigBackingStore configStorage;135    @Mock private StatusBackingStore statusBackingStore;136    @Mock private WorkerGroupMember member;137    private MockTime time;138    private DistributedHerder herder;139    @Mock private Worker worker;140    @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;141    private ConfigBackingStore.UpdateListener configUpdateListener;142    private WorkerRebalanceListener rebalanceListener;143    @Before144    public void setUp() throws Exception {145        worker = PowerMock.createMock(Worker.class);146        EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);147        time = new MockTime();148        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},149                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configStorage, member, MEMBER_URL, time);150        configUpdateListener = herder.new ConfigUpdateListener();151        rebalanceListener = herder.new RebalanceListener();152        PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();153    }154    @Test155    public void testJoinAssignment() throws Exception {156        // Join group and get assignment157        EasyMock.expect(member.memberId()).andStubReturn("member");158        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));159        expectPostRebalanceCatchup(SNAPSHOT);160        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),161                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));162        PowerMock.expectLastCall();163        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);164        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);165        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));166        PowerMock.expectLastCall();167        member.poll(EasyMock.anyInt());168        PowerMock.expectLastCall();169        PowerMock.replayAll();170        herder.tick();171        PowerMock.verifyAll();172    }173    @Test174    public void testRebalance() throws Exception {175        // Join group and get assignment176        EasyMock.expect(member.memberId()).andStubReturn("member");177        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));178        expectPostRebalanceCatchup(SNAPSHOT);179        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),180                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));181        PowerMock.expectLastCall();182        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);183        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);184        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));185        PowerMock.expectLastCall();186        member.poll(EasyMock.anyInt());187        PowerMock.expectLastCall();188        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,189                1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());190        // and the new assignment started191        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),192                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));193        PowerMock.expectLastCall();194        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);195        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);196        member.poll(EasyMock.anyInt());197        PowerMock.expectLastCall();198        PowerMock.replayAll();199        herder.tick();200        herder.tick();201        PowerMock.verifyAll();202    }203    @Test204    public void testRebalanceFailedConnector() throws Exception {205        // Join group and get assignment206        EasyMock.expect(member.memberId()).andStubReturn("member");207        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));208        expectPostRebalanceCatchup(SNAPSHOT);209        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),210                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));211        PowerMock.expectLastCall();212        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);213        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);214        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));215        PowerMock.expectLastCall();216        member.poll(EasyMock.anyInt());217        PowerMock.expectLastCall();218        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,219                1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());220        // and the new assignment started221        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),222                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));223        PowerMock.expectLastCall();224        EasyMock.expect(worker.isRunning(CONN1)).andReturn(false);225        // worker is not running, so we should see no call to connectorTaskConfigs()226        member.poll(EasyMock.anyInt());227        PowerMock.expectLastCall();228        PowerMock.replayAll();229        herder.tick();230        herder.tick();231        PowerMock.verifyAll();232    }233    @Test234    public void testHaltCleansUpWorker() {235        EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));236        worker.stopConnector(CONN1);237        PowerMock.expectLastCall();238        EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));239        worker.stopTasks(Collections.singleton(TASK1));240        PowerMock.expectLastCall();241        worker.awaitStopTasks(Collections.singleton(TASK1));242        PowerMock.expectLastCall();243        member.stop();244        PowerMock.expectLastCall();245        configStorage.stop();246        PowerMock.expectLastCall();247        statusBackingStore.stop();248        PowerMock.expectLastCall();249        worker.stop();250        PowerMock.expectLastCall();251        PowerMock.replayAll();252        herder.halt();253        PowerMock.verifyAll();254    }255    @Test256    public void testCreateConnector() throws Exception {257        EasyMock.expect(member.memberId()).andStubReturn("leader");258        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());259        expectPostRebalanceCatchup(SNAPSHOT);260        member.wakeup();261        PowerMock.expectLastCall();262        // CONN2 is new, should succeed263        configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);264        PowerMock.expectLastCall();265        ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList());266        putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info));267        PowerMock.expectLastCall();268        member.poll(EasyMock.anyInt());269        PowerMock.expectLastCall();270        // No immediate action besides this -- change will be picked up via the config log271        PowerMock.replayAll();272        herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);273        herder.tick();274        PowerMock.verifyAll();275    }276    @Test277    public void testCreateConnectorAlreadyExists() throws Exception {278        EasyMock.expect(member.memberId()).andStubReturn("leader");279        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());280        expectPostRebalanceCatchup(SNAPSHOT);281        member.wakeup();282        PowerMock.expectLastCall();283        // CONN1 already exists284        putConnectorCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());285        PowerMock.expectLastCall();286        member.poll(EasyMock.anyInt());287        PowerMock.expectLastCall();288        // No immediate action besides this -- change will be picked up via the config log289        PowerMock.replayAll();290        herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback);291        herder.tick();292        PowerMock.verifyAll();293    }294    @Test295    public void testDestroyConnector() throws Exception {296        EasyMock.expect(member.memberId()).andStubReturn("leader");297        // Start with one connector298        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());299        expectPostRebalanceCatchup(SNAPSHOT);300        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),301                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));302        PowerMock.expectLastCall();303        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);304        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);305        // And delete the connector306        member.wakeup();307        PowerMock.expectLastCall();308        configStorage.removeConnectorConfig(CONN1);309        PowerMock.expectLastCall();310        putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null));311        PowerMock.expectLastCall();312        member.poll(EasyMock.anyInt());313        PowerMock.expectLastCall();314        // No immediate action besides this -- change will be picked up via the config log315        PowerMock.replayAll();316        herder.putConnectorConfig(CONN1, null, true, putConnectorCallback);317        herder.tick();318        PowerMock.verifyAll();319    }320    @Test321    public void testRestartConnector() throws Exception {322        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS);323        // get the initial assignment324        EasyMock.expect(member.memberId()).andStubReturn("leader");325        expectRebalance(1, Collections.singletonList(CONN1), Collections.<ConnectorTaskId>emptyList());326        expectPostRebalanceCatchup(SNAPSHOT);327        member.poll(EasyMock.anyInt());328        PowerMock.expectLastCall();329        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),330                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));331        PowerMock.expectLastCall();332        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);333        // now handle the connector restart334        member.wakeup();335        PowerMock.expectLastCall();336        member.ensureActive();337        PowerMock.expectLastCall();338        member.poll(EasyMock.anyInt());339        PowerMock.expectLastCall();340        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);341        worker.stopConnector(CONN1);342        PowerMock.expectLastCall();343        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),344                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));345        PowerMock.expectLastCall();346        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);347        PowerMock.replayAll();348        herder.tick();349        FutureCallback<Void> callback = new FutureCallback<>();350        herder.restartConnector(CONN1, callback);351        herder.tick();352        callback.get(1000L, TimeUnit.MILLISECONDS);353        PowerMock.verifyAll();354    }355    @Test356    public void testRestartUnknownConnector() throws Exception {357        // get the initial assignment358        EasyMock.expect(member.memberId()).andStubReturn("leader");359        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());360        expectPostRebalanceCatchup(SNAPSHOT);361        member.poll(EasyMock.anyInt());362        PowerMock.expectLastCall();363        // now handle the connector restart364        member.wakeup();365        PowerMock.expectLastCall();366        member.ensureActive();367        PowerMock.expectLastCall();368        member.poll(EasyMock.anyInt());369        PowerMock.expectLastCall();370        PowerMock.replayAll();371        herder.tick();372        FutureCallback<Void> callback = new FutureCallback<>();373        herder.restartConnector(CONN2, callback);374        herder.tick();375        try {376            callback.get(1000L, TimeUnit.MILLISECONDS);377            fail("Expected NotLeaderException to be raised");378        } catch (ExecutionException e) {379            assertTrue(e.getCause() instanceof NotFoundException);380        }381        PowerMock.verifyAll();382    }383    @Test384    public void testRestartConnectorRedirectToLeader() throws Exception {385        // get the initial assignment386        EasyMock.expect(member.memberId()).andStubReturn("member");387        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());388        expectPostRebalanceCatchup(SNAPSHOT);389        member.poll(EasyMock.anyInt());390        PowerMock.expectLastCall();391        // now handle the connector restart392        member.wakeup();393        PowerMock.expectLastCall();394        member.ensureActive();395        PowerMock.expectLastCall();396        member.poll(EasyMock.anyInt());397        PowerMock.expectLastCall();398        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);399        PowerMock.replayAll();400        herder.tick();401        FutureCallback<Void> callback = new FutureCallback<>();402        herder.restartConnector(CONN1, callback);403        herder.tick();404        try {405            callback.get(1000L, TimeUnit.MILLISECONDS);406            fail("Expected NotLeaderException to be raised");407        } catch (ExecutionException e) {408            assertTrue(e.getCause() instanceof NotLeaderException);409        }410        PowerMock.verifyAll();411    }412    @Test413    public void testRestartConnectorRedirectToOwner() throws Exception {414        // get the initial assignment415        EasyMock.expect(member.memberId()).andStubReturn("leader");416        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());417        expectPostRebalanceCatchup(SNAPSHOT);418        member.poll(EasyMock.anyInt());419        PowerMock.expectLastCall();420        // now handle the connector restart421        member.wakeup();422        PowerMock.expectLastCall();423        member.ensureActive();424        PowerMock.expectLastCall();425        member.poll(EasyMock.anyInt());426        PowerMock.expectLastCall();427        String ownerUrl = "ownerUrl";428        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);429        EasyMock.expect(member.ownerUrl(CONN1)).andReturn(ownerUrl);430        PowerMock.replayAll();431        herder.tick();432        FutureCallback<Void> callback = new FutureCallback<>();433        herder.restartConnector(CONN1, callback);434        herder.tick();435        try {436            callback.get(1000L, TimeUnit.MILLISECONDS);437            fail("Expected NotLeaderException to be raised");438        } catch (ExecutionException e) {439            assertTrue(e.getCause() instanceof NotAssignedException);440            NotAssignedException notAssignedException = (NotAssignedException) e.getCause();441            assertEquals(ownerUrl, notAssignedException.forwardUrl());442        }443        PowerMock.verifyAll();444    }445    @Test446    public void testRestartTask() throws Exception {447        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS);448        // get the initial assignment449        EasyMock.expect(member.memberId()).andStubReturn("leader");450        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));451        expectPostRebalanceCatchup(SNAPSHOT);452        member.poll(EasyMock.anyInt());453        PowerMock.expectLastCall();454        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));455        PowerMock.expectLastCall();456        // now handle the task restart457        member.wakeup();458        PowerMock.expectLastCall();459        member.ensureActive();460        PowerMock.expectLastCall();461        member.poll(EasyMock.anyInt());462        PowerMock.expectLastCall();463        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(true);464        worker.stopAndAwaitTask(TASK0);465        PowerMock.expectLastCall();466        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));467        PowerMock.expectLastCall();468        PowerMock.replayAll();469        herder.tick();470        FutureCallback<Void> callback = new FutureCallback<>();471        herder.restartTask(TASK0, callback);472        herder.tick();473        callback.get(1000L, TimeUnit.MILLISECONDS);474        PowerMock.verifyAll();475    }476    @Test477    public void testRestartUnknownTask() throws Exception {478        // get the initial assignment479        EasyMock.expect(member.memberId()).andStubReturn("member");480        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());481        expectPostRebalanceCatchup(SNAPSHOT);482        member.poll(EasyMock.anyInt());483        PowerMock.expectLastCall();484        member.wakeup();485        PowerMock.expectLastCall();486        member.ensureActive();487        PowerMock.expectLastCall();488        member.poll(EasyMock.anyInt());489        PowerMock.expectLastCall();490        PowerMock.replayAll();491        FutureCallback<Void> callback = new FutureCallback<>();492        herder.tick();493        herder.restartTask(new ConnectorTaskId("blah", 0), callback);494        herder.tick();495        try {496            callback.get(1000L, TimeUnit.MILLISECONDS);497            fail("Expected NotLeaderException to be raised");498        } catch (ExecutionException e) {499            assertTrue(e.getCause() instanceof NotFoundException);500        }501        PowerMock.verifyAll();502    }503    @Test504    public void testRestartTaskRedirectToLeader() throws Exception {505        // get the initial assignment506        EasyMock.expect(member.memberId()).andStubReturn("member");507        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());508        expectPostRebalanceCatchup(SNAPSHOT);509        member.poll(EasyMock.anyInt());510        PowerMock.expectLastCall();511        // now handle the task restart512        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);513        member.wakeup();514        PowerMock.expectLastCall();515        member.ensureActive();516        PowerMock.expectLastCall();517        member.poll(EasyMock.anyInt());518        PowerMock.expectLastCall();519        PowerMock.replayAll();520        herder.tick();521        FutureCallback<Void> callback = new FutureCallback<>();522        herder.restartTask(TASK0, callback);523        herder.tick();524        try {525            callback.get(1000L, TimeUnit.MILLISECONDS);526            fail("Expected NotLeaderException to be raised");527        } catch (ExecutionException e) {528            assertTrue(e.getCause() instanceof NotLeaderException);529        }530        PowerMock.verifyAll();531    }532    @Test533    public void testRestartTaskRedirectToOwner() throws Exception {534        // get the initial assignment535        EasyMock.expect(member.memberId()).andStubReturn("leader");536        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());537        expectPostRebalanceCatchup(SNAPSHOT);538        member.poll(EasyMock.anyInt());539        PowerMock.expectLastCall();540        // now handle the task restart541        String ownerUrl = "ownerUrl";542        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);543        EasyMock.expect(member.ownerUrl(TASK0)).andReturn(ownerUrl);544        member.wakeup();545        PowerMock.expectLastCall();546        member.ensureActive();547        PowerMock.expectLastCall();548        member.poll(EasyMock.anyInt());549        PowerMock.expectLastCall();550        PowerMock.replayAll();551        herder.tick();552        FutureCallback<Void> callback = new FutureCallback<>();553        herder.restartTask(TASK0, callback);554        herder.tick();555        try {556            callback.get(1000L, TimeUnit.MILLISECONDS);557            fail("Expected NotLeaderException to be raised");558        } catch (ExecutionException e) {559            assertTrue(e.getCause() instanceof NotAssignedException);560            NotAssignedException notAssignedException = (NotAssignedException) e.getCause();561            assertEquals(ownerUrl, notAssignedException.forwardUrl());562        }563        PowerMock.verifyAll();564    }565    @Test566    public void testConnectorConfigAdded() {567        // If a connector was added, we need to rebalance568        EasyMock.expect(member.memberId()).andStubReturn("member");569        // join, no configs so no need to catch up on config topic570        expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());571        member.poll(EasyMock.anyInt());572        PowerMock.expectLastCall();573        // apply config574        member.wakeup();575        member.ensureActive();576        PowerMock.expectLastCall();577        // Checks for config updates and starts rebalance578        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);579        member.requestRejoin();580        PowerMock.expectLastCall();581        // Performs rebalance and gets new assignment582        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),583                ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());584        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),585                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));586        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);587        PowerMock.expectLastCall();588        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);589        member.poll(EasyMock.anyInt());590        PowerMock.expectLastCall();591        PowerMock.replayAll();592        herder.tick(); // join593        configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config594        herder.tick(); // apply config595        herder.tick(); // do rebalance596        PowerMock.verifyAll();597    }598    @Test599    public void testConnectorConfigUpdate() throws Exception {600        // Connector config can be applied without any rebalance601        EasyMock.expect(member.memberId()).andStubReturn("member");602        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));603        // join604        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());605        expectPostRebalanceCatchup(SNAPSHOT);606        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),607                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));608        PowerMock.expectLastCall();609        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);610        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);611        member.poll(EasyMock.anyInt());612        PowerMock.expectLastCall();613        // apply config614        member.wakeup();615        member.ensureActive();616        PowerMock.expectLastCall();617        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot618        worker.stopConnector(CONN1);619        PowerMock.expectLastCall();620        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),621                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));622        PowerMock.expectLastCall();623        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);624        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);625        member.poll(EasyMock.anyInt());626        PowerMock.expectLastCall();627        PowerMock.replayAll();628        herder.tick(); // join629        configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config630        herder.tick(); // apply config631        PowerMock.verifyAll();632    }633    @Test634    public void testConnectorPaused() throws Exception {635        // ensure that target state changes are propagated to the worker636        EasyMock.expect(member.memberId()).andStubReturn("member");637        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));638        // join639        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());640        expectPostRebalanceCatchup(SNAPSHOT);641        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),642                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));643        PowerMock.expectLastCall();644        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);645        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);646        member.poll(EasyMock.anyInt());647        PowerMock.expectLastCall();648        // handle the state change649        member.wakeup();650        member.ensureActive();651        PowerMock.expectLastCall();652        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);653        PowerMock.expectLastCall();654        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);655        worker.setTargetState(CONN1, TargetState.PAUSED);656        PowerMock.expectLastCall();657        member.poll(EasyMock.anyInt());658        PowerMock.expectLastCall();659        PowerMock.replayAll();660        herder.tick(); // join661        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused662        herder.tick(); // worker should apply the state change663        PowerMock.verifyAll();664    }665    @Test666    public void testConnectorResumed() throws Exception {667        EasyMock.expect(member.memberId()).andStubReturn("member");668        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));669        // start with the connector paused670        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());671        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);672        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),673                EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));674        PowerMock.expectLastCall();675        member.poll(EasyMock.anyInt());676        PowerMock.expectLastCall();677        // handle the state change678        member.wakeup();679        member.ensureActive();680        PowerMock.expectLastCall();681        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);682        PowerMock.expectLastCall();683        // we expect reconfiguration after resuming684        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);685        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);686        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);687        worker.setTargetState(CONN1, TargetState.STARTED);688        PowerMock.expectLastCall();689        member.poll(EasyMock.anyInt());690        PowerMock.expectLastCall();691        PowerMock.replayAll();692        herder.tick(); // join693        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to started694        herder.tick(); // apply state change695        PowerMock.verifyAll();696    }697    @Test698    public void testUnknownConnectorPaused() throws Exception {699        EasyMock.expect(member.memberId()).andStubReturn("member");700        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));701        // join702        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));703        expectPostRebalanceCatchup(SNAPSHOT);704        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));705        PowerMock.expectLastCall();706        member.poll(EasyMock.anyInt());707        PowerMock.expectLastCall();708        // state change is ignored since we have no target state709        member.wakeup();710        member.ensureActive();711        PowerMock.expectLastCall();712        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);713        PowerMock.expectLastCall();714        member.poll(EasyMock.anyInt());715        PowerMock.expectLastCall();716        PowerMock.replayAll();717        herder.tick(); // join718        configUpdateListener.onConnectorTargetStateChange("unknown-connector");719        herder.tick(); // continue720        PowerMock.verifyAll();721    }722    @Test723    public void testConnectorPausedRunningTaskOnly() throws Exception {724        // even if we don't own the connector, we should still propagate target state725        // changes to the worker so that tasks will transition correctly726        EasyMock.expect(member.memberId()).andStubReturn("member");727        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());728        // join729        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));730        expectPostRebalanceCatchup(SNAPSHOT);731        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));732        PowerMock.expectLastCall();733        member.poll(EasyMock.anyInt());734        PowerMock.expectLastCall();735        // handle the state change736        member.wakeup();737        member.ensureActive();738        PowerMock.expectLastCall();739        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);740        PowerMock.expectLastCall();741        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);742        worker.setTargetState(CONN1, TargetState.PAUSED);743        PowerMock.expectLastCall();744        member.poll(EasyMock.anyInt());745        PowerMock.expectLastCall();746        PowerMock.replayAll();747        herder.tick(); // join748        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused749        herder.tick(); // apply state change750        PowerMock.verifyAll();751    }752    @Test753    public void testConnectorResumedRunningTaskOnly() throws Exception {754        // even if we don't own the connector, we should still propagate target state755        // changes to the worker so that tasks will transition correctly756        EasyMock.expect(member.memberId()).andStubReturn("member");757        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());758        // join759        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));760        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);761        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));762        PowerMock.expectLastCall();763        member.poll(EasyMock.anyInt());764        PowerMock.expectLastCall();765        // handle the state change766        member.wakeup();767        member.ensureActive();768        PowerMock.expectLastCall();769        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);770        PowerMock.expectLastCall();771        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);772        worker.setTargetState(CONN1, TargetState.STARTED);773        PowerMock.expectLastCall();774        member.poll(EasyMock.anyInt());775        PowerMock.expectLastCall();776        PowerMock.replayAll();777        herder.tick(); // join778        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused779        herder.tick(); // apply state change780        PowerMock.verifyAll();781    }782    @Test783    public void testTaskConfigAdded() {784        // Task config always requires rebalance785        EasyMock.expect(member.memberId()).andStubReturn("member");786        // join787        expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());788        member.poll(EasyMock.anyInt());789        PowerMock.expectLastCall();790        // apply config791        member.wakeup();792        member.ensureActive();793        PowerMock.expectLastCall();794        // Checks for config updates and starts rebalance795        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);796        member.requestRejoin();797        PowerMock.expectLastCall();798        // Performs rebalance and gets new assignment799        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),800                ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),801                Arrays.asList(TASK0));802        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));803        PowerMock.expectLastCall();804        member.poll(EasyMock.anyInt());805        PowerMock.expectLastCall();806        PowerMock.replayAll();807        herder.tick(); // join808        configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK0, TASK1, TASK2)); // read updated config809        herder.tick(); // apply config810        herder.tick(); // do rebalance811        PowerMock.verifyAll();812    }813    @Test814    public void testJoinLeaderCatchUpFails() throws Exception {815        // Join group and as leader fail to do assignment816        EasyMock.expect(member.memberId()).andStubReturn("leader");817        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),818                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),819                Collections.<ConnectorTaskId>emptyList());820        // Reading to end of log times out821        configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));822        EasyMock.expectLastCall().andThrow(new TimeoutException());823        member.maybeLeaveGroup();824        EasyMock.expectLastCall();825        PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);826        member.requestRejoin();827        // After backoff, restart the process and this time succeed828        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));829        expectPostRebalanceCatchup(SNAPSHOT);830        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),831                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));832        PowerMock.expectLastCall();833        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);834        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));835        PowerMock.expectLastCall();836        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);837        member.poll(EasyMock.anyInt());838        PowerMock.expectLastCall();839        PowerMock.replayAll();840        herder.tick();841        herder.tick();842        PowerMock.verifyAll();843    }844    @Test845    public void testAccessors() throws Exception {846        EasyMock.expect(member.memberId()).andStubReturn("leader");847        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());848        expectPostRebalanceCatchup(SNAPSHOT);849        member.wakeup();850        PowerMock.expectLastCall().anyTimes();851        // list connectors, get connector info, get connector config, get task configs852        member.poll(EasyMock.anyInt());853        PowerMock.expectLastCall();854        PowerMock.replayAll();855        FutureCallback<Collection<String>> listConnectorsCb = new FutureCallback<>();856        herder.connectors(listConnectorsCb);857        FutureCallback<ConnectorInfo> connectorInfoCb = new FutureCallback<>();858        herder.connectorInfo(CONN1, connectorInfoCb);859        FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>();860        herder.connectorConfig(CONN1, connectorConfigCb);861        FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>();862        herder.taskConfigs(CONN1, taskConfigsCb);863        herder.tick();864        assertTrue(listConnectorsCb.isDone());865        assertEquals(Collections.singleton(CONN1), listConnectorsCb.get());866        assertTrue(connectorInfoCb.isDone());867        ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2));868        assertEquals(info, connectorInfoCb.get());869        assertTrue(connectorConfigCb.isDone());870        assertEquals(CONN1_CONFIG, connectorConfigCb.get());871        assertTrue(taskConfigsCb.isDone());872        assertEquals(Arrays.asList(873                        new TaskInfo(TASK0, TASK_CONFIG),874                        new TaskInfo(TASK1, TASK_CONFIG),875                        new TaskInfo(TASK2, TASK_CONFIG)),876                taskConfigsCb.get());877        PowerMock.verifyAll();878    }879    @Test880    public void testPutConnectorConfig() throws Exception {881        EasyMock.expect(member.memberId()).andStubReturn("leader");882        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());883        expectPostRebalanceCatchup(SNAPSHOT);884        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),885                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));886        PowerMock.expectLastCall();887        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);888        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);889        // list connectors, get connector info, get connector config, get task configs890        member.wakeup();891        PowerMock.expectLastCall().anyTimes();892        member.poll(EasyMock.anyInt());893        PowerMock.expectLastCall();894        // Poll loop for second round of calls895        member.ensureActive();896        PowerMock.expectLastCall();897        configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);898        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {899            @Override900            public Object answer() throws Throwable {901                // Simulate response to writing config + waiting until end of log to be read902                configUpdateListener.onConnectorConfigUpdate(CONN1);903                return null;904            }905        });906        // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart907        // connector without rebalance908        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);909        worker.stopConnector(CONN1);910        PowerMock.expectLastCall();911        Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();912        worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),913                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));914        PowerMock.expectLastCall();915        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);916        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);917        member.poll(EasyMock.anyInt());918        PowerMock.expectLastCall();919        // Third tick just to read the config920        member.ensureActive();921        PowerMock.expectLastCall();922        member.poll(EasyMock.anyInt());923        PowerMock.expectLastCall();924        PowerMock.replayAll();925        // Should pick up original config926        FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>();927        herder.connectorConfig(CONN1, connectorConfigCb);928        herder.tick();929        assertTrue(connectorConfigCb.isDone());930        assertEquals(CONN1_CONFIG, connectorConfigCb.get());931        // Apply new config.932        FutureCallback<Herder.Created<ConnectorInfo>> putConfigCb = new FutureCallback<>();933        herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, putConfigCb);934        herder.tick();935        assertTrue(putConfigCb.isDone());936        ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2));937        assertEquals(new Herder.Created<>(false, updatedInfo), putConfigCb.get());938        // Check config again to validate change939        connectorConfigCb = new FutureCallback<>();940        herder.connectorConfig(CONN1, connectorConfigCb);941        herder.tick();942        assertTrue(connectorConfigCb.isDone());943        assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get());944        PowerMock.verifyAll();945    }946    @Test947    public void testInconsistentConfigs() throws Exception {948        // FIXME: if we have inconsistent configs, we need to request forced reconfig + write of the connector's task configs949        // This requires inter-worker communication, so needs the REST API950    }951    private void expectRebalance(final long offset,952                                 final List<String> assignedConnectors,953                                 final List<ConnectorTaskId> assignedTasks) {954        expectRebalance(null, null, ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks);955    }956    // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.957    private void expectRebalance(final Collection<String> revokedConnectors,958                                 final List<ConnectorTaskId> revokedTasks,959                                 final short error,960                                 final long offset,961                                 final List<String> assignedConnectors,962                                 final List<ConnectorTaskId> assignedTasks) {963        member.ensureActive();964        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {965            @Override966            public Object answer() throws Throwable {967                if (revokedConnectors != null)968                    rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);969                ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(970                        error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);971                rebalanceListener.onAssigned(assignment, 0);972                return null;973            }974        });975        if (revokedConnectors != null) {976            for (String connector : revokedConnectors) {977                worker.stopConnector(connector);978                PowerMock.expectLastCall();979            }980        }981        if (revokedTasks != null && !revokedTasks.isEmpty()) {982            worker.stopTasks(revokedTasks);983            PowerMock.expectLastCall();984            worker.awaitStopTasks(revokedTasks);985            PowerMock.expectLastCall();986        }987        if (revokedConnectors != null) {988            statusBackingStore.flush();989            PowerMock.expectLastCall();990        }991        member.wakeup();992        PowerMock.expectLastCall();993    }994    private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) throws TimeoutException {995        configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));996        EasyMock.expectLastCall();997        EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);998    }999    // We need to use a real class here due to some issue with mocking java.lang.Class1000    private abstract class BogusSourceConnector extends SourceConnector {1001    }1002    private abstract class BogusSourceTask extends SourceTask {1003    }1004}...Source:SourceTaskOffsetCommitterTest.java  
...22import org.easymock.Capture;23import org.easymock.EasyMock;24import org.junit.Test;25import org.junit.runner.RunWith;26import org.powermock.api.easymock.PowerMock;27import org.powermock.api.easymock.annotation.Mock;28import org.powermock.core.classloader.annotations.PrepareForTest;29import org.powermock.modules.junit4.PowerMockRunner;30import org.powermock.reflect.Whitebox;31import org.slf4j.Logger;32import org.slf4j.LoggerFactory;33import java.util.HashMap;34import java.util.Map;35import java.util.concurrent.CancellationException;36import java.util.concurrent.ConcurrentHashMap;37import java.util.concurrent.ScheduledExecutorService;38import java.util.concurrent.ScheduledFuture;39import java.util.concurrent.TimeUnit;40import static org.easymock.EasyMock.eq;41import static org.junit.Assert.assertNotNull;42import static org.junit.Assert.assertTrue;43import static org.junit.Assert.fail;44@RunWith(PowerMockRunner.class)45@PrepareForTest({SourceTaskOffsetCommitter.class, LoggerFactory.class})46public class SourceTaskOffsetCommitterTest extends ThreadedTest {47    @Mock48    private ScheduledExecutorService executor;49    @Mock50    private ConcurrentHashMap committers;51    @Mock52    private Logger mockLog;53    private SourceTaskOffsetCommitter committer;54    private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000;55    @Override56    public void setup() {57        super.setup();58        Map<String, String> workerProps = new HashMap<>();59        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");60        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");61        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");62        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");63        workerProps.put("internal.key.converter.schemas.enable", "false");64        workerProps.put("internal.value.converter.schemas.enable", "false");65        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");66        workerProps.put("offset.flush.interval.ms",67                Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));68        WorkerConfig config = new StandaloneConfig(workerProps);69        committer = new SourceTaskOffsetCommitter(config, executor, committers);70        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);71    }72    @Test73    public void testSchedule() throws Exception {74        Capture<Runnable> taskWrapper = EasyMock.newCapture();75        ScheduledFuture commitFuture = PowerMock.createMock(ScheduledFuture.class);76        EasyMock.expect(executor.scheduleWithFixedDelay(77                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),78                eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))79        ).andReturn(commitFuture);80        ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);81        WorkerSourceTask task = PowerMock.createMock(WorkerSourceTask.class);82        EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null);83        PowerMock.replayAll();84        committer.schedule(taskId, task);85        assertTrue(taskWrapper.hasCaptured());86        assertNotNull(taskWrapper.getValue());87        PowerMock.verifyAll();88    }89    @Test90    public void testClose() throws Exception {91        long timeoutMs = 1000;92        // Normal termination, where termination times out.93        executor.shutdown();94        PowerMock.expectLastCall();95        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))96                .andReturn(false);97        mockLog.error(EasyMock.anyString());98        PowerMock.expectLastCall();99        PowerMock.replayAll();100        committer.close(timeoutMs);101        PowerMock.verifyAll();102        PowerMock.resetAll();103        // Termination interrupted104        executor.shutdown();105        PowerMock.expectLastCall();106        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))107                .andThrow(new InterruptedException());108        PowerMock.replayAll();109        committer.close(timeoutMs);110        PowerMock.verifyAll();111    }112    @Test113    public void testRemove() throws Exception {114        ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);115        ScheduledFuture task = PowerMock.createMock(ScheduledFuture.class);116        // Try to remove a non-existing task117        EasyMock.expect(committers.remove(taskId)).andReturn(null);118        PowerMock.replayAll();119        committer.remove(taskId);120        PowerMock.verifyAll();121        PowerMock.resetAll();122        // Try to remove an existing task123        EasyMock.expect(committers.remove(taskId)).andReturn(task);124        EasyMock.expect(task.cancel(eq(false))).andReturn(false);125        EasyMock.expect(task.isDone()).andReturn(false);126        EasyMock.expect(task.get()).andReturn(null);127        PowerMock.replayAll();128        committer.remove(taskId);129        PowerMock.verifyAll();130        PowerMock.resetAll();131        // Try to remove a cancelled task132        EasyMock.expect(committers.remove(taskId)).andReturn(task);133        EasyMock.expect(task.cancel(eq(false))).andReturn(false);134        EasyMock.expect(task.isDone()).andReturn(false);135        EasyMock.expect(task.get()).andThrow(new CancellationException());136        mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());137        PowerMock.expectLastCall();138        PowerMock.replayAll();139        committer.remove(taskId);140        PowerMock.verifyAll();141        PowerMock.resetAll();142        // Try to remove an interrupted task143        EasyMock.expect(committers.remove(taskId)).andReturn(task);144        EasyMock.expect(task.cancel(eq(false))).andReturn(false);145        EasyMock.expect(task.isDone()).andReturn(false);146        EasyMock.expect(task.get()).andThrow(new InterruptedException());147        PowerMock.replayAll();148        try {149            committer.remove(taskId);150            fail("Expected ConnectException to be raised");151        } catch (ConnectException e) {152            //ignore153        }154        PowerMock.verifyAll();155    }156}...PowerMock
Using AI Code Generation
1import org.powermock.api.easymock.PowerMock;2import org.powermock.api.easymock.annotation.Mock;3import org.powermock.api.easymock.annotation.MockNice;4import org.powermock.api.easymock.annotation.MockStrict;5import org.powermock.api.easymock.annotation.TestSubject;6import org.powermock.core.classloader.annotations.PrepareForTest;7@PrepareForTest({ Class1.class, Class2.class })8public class 4 {9	private Class1 class1 = new Class1();10	private Class2 class2;11	private Class3 class3;12	private Class4 class4;13	public void setUp() {14		PowerMock.mockStatic(Class2.class);15	}16	public void test() {17	}18}19import org.powermock.api.easymock.PowerMock;20import org.powermock.api.easymock.annotation.Mock;21import org.powermock.api.easymock.annotation.MockNice;22import org.powermock.api.easymock.annotation.MockStrict;23import org.powermock.api.easymock.annotation.TestSubject;24import org.powermock.core.classloader.annotations.PrepareForTest;25@PrepareForTest({ Class1.class, Class2.class })26public class 5 {27	private Class1 class1 = new Class1();28	private Class2 class2;29	private Class3 class3;30	private Class4 class4;31	public void setUp() {32		PowerMock.mockStatic(Class2.class);33	}34	public void test() {35	}36}37import org.powermock.api.easymock.PowerMock;38import org.powermock.api.easymock.annotation.Mock;39import org.powermock.api.easymock.annotation.MockNice;40import org.powermock.api.easymock.annotation.MockStrict;41import org.powermock.api.easymock.annotation.TestSubject;42import org.powermock.core.classloader.annotations.PrepareForTest;43@PrepareForTest({ Class1.class, Class2.class })PowerMock
Using AI Code Generation
1import org.powermock.api.easymock.PowerMock;2import org.powermock.core.classloader.annotations.PrepareForTest;3import org.powermock.modules.junit4.PowerMockRunner;4import org.powermock.modules.junit4.PowerMockRunnerDelegate;5import org.junit.runner.RunWith;6import org.junit.Test;7import org.junit.Before;8import org.junit.After;9import org.junit.Assert;10import org.junit.runner.JUnitCore;11import org.junit.runner.Result;12import org.junit.runner.notification.Failure;13import org.junit.runners.JUnit4;14import org.junit.runner.Description;15import org.junit.runner.Runner;16import org.junit.runner.notification.RunNotifier;17import org.junit.runners.model.RunnerBuilder;18import org.junit.runners.model.InitializationError;19import org.junit.runners.model.Statement;20import org.junit.runners.model.FrameworkMethod;21import java.lang.reflect.Method;22import java.lang.reflect.InvocationTargetException;23import java.lang.reflect.Constructor;24import java.util.List;25import java.util.ArrayList;26import java.util.Arrays;27import java.util.Collection;28import java.util.Collections;29import java.util.Comparator;30import java.util.Iterator;31import java.util.LinkedList;32import java.util.concurrent.Callable;33import java.util.concurrent.ExecutorService;34import java.util.concurrent.Executors;35import java.util.concurrent.Future;36import java.util.concurrent.TimeUnit;37import java.util.concurrent.TimeoutException;38import java.util.concurrent.ExecutionException;39import java.util.concurrent.atomic.AtomicReference;40import java.util.concurrent.atomic.AtomicBoolean;41import java.lang.reflect.Field;42import java.lang.reflect.Modifier;43import java.lang.reflect.Type;44import java.lang.reflect.ParameterizedType;45import java.lang.reflect.TypeVariable;46import java.lang.reflect.GenericArrayType;47import java.lang.reflect.GenericDeclaration;48import java.lang.reflect.WildcardType;49import java.lang.reflect.AccessibleObject;50import java.lang.reflect.Constructor;51import java.lang.reflect.Method;52import java.lang.reflect.Parameter;53import java.lang.reflect.Proxy;54import java.lang.reflect.UndeclaredThrowableException;55import java.lang.reflect.InvocationHandler;56import java.lang.reflect.InvocationTargetException;57import java.lang.reflect.Array;58import java.lang.reflect.TypeVariable;59import java.lang.reflect.Field;60import java.lang.reflect.Modifier;61import java.lang.reflect.Type;62import java.lang.reflect.ParameterizedType;63import java.lang.reflect.TypeVariable;64import java.lang.reflect.GenericArrayType;65import java.lang.reflect.GenericDeclaration;66import java.lang.reflect.WildcardType;67import java.lang.reflect.AccessibleObject;68import java.lang.reflect.Constructor;69import java.lang.reflect.Method;70import java.lang.reflect.Parameter;71import java.lang.reflect.Proxy;PowerMock
Using AI Code Generation
1import org.powermock.api.easymock.PowerMock;2import org.powermock.api.easymock.annotation.Mock;3import org.powermock.modules.junit4.PowerMockRunner;4import org.powermock.modules.junit4.PowerMockRunnerDelegate;5import org.powermock.reflect.Whitebox;6import org.junit.Test;7import org.junit.runner.RunWith;8import org.powermock.core.classloader.annotations.PrepareForTest;9import org.powermock.modules.junit4.PowerMockRunner;10import org.powermock.modules.junit4.PowerMockRunnerDelegate;11import org.powermock.modules.junit4.rule.PowerMockRule;12import org.powermock.modules.junit4.rule.PowerMockRule;13import org.powermock.reflect.Whitebox;14import org.powermock.reflect.internal.WhPowerMock
Using AI Code Generation
1package com.mycompany.app;2import static org.powermock.api.easymock.PowerMock.*;3import org.powermock.api.easymock.annotation.Mock;4import org.powermock.api.easymock.annotation.MockNice;5import org.powermock.api.easymock.annotation.MockStrict;6import org.powermock.api.easymock.annotation.MockUp;7import org.powermock.api.easymock.annotation.Mocked;8import org.powermock.api.easymock.annotation.Tested;9import org.powermock.api.easymock.internal.MockGateway;10import org.powermock.api.easymock.internal.expectation.AssertionErrorCollector;11import org.powermock.api.easymock.internal.expectation.ConstructorExpectation;12import org.powermock.api.easymock.internal.expectation.ConstructorExpectationBuilder;13import org.powermock.api.easymock.internal.expectation.ConstructorInvocation;14import org.powermock.api.easymock.internal.expectation.ConstructorInvocationBuilder;15import org.powermock.api.easymock.internal.expectation.ConstructorInvocationMatcher;16import org.powermock.api.easymock.internal.expectation.ConstructorInvocationMatcherBuilder;17import org.powermock.api.easymock.internal.expectation.ConstructorInvocationMatcherFactory;18import org.powermock.api.easymock.internal.expectation.ConstructorUnexpectedInvocationError;19import org.powermock.api.easymock.internal.expectation.ConstructorUnexpectedInvocationErrorFactory;20import org.powermock.api.easymock.internal.expectation.DefaultAssertionErrorCollector;21import org.powermock.api.easymock.internal.expectation.DefaultConstructorUnexpectedInvocationErrorFactory;22import org.powermock.api.easymock.internal.expectation.DefaultExpectationBuilder;23import org.powermock.api.easymock.internal.expectation.DefaultInvocationBuilder;24import org.powermock.api.easymock.internal.expectation.DefaultInvocationMatcherBuilder;25import org.powermock.api.easymock.internal.expectation.DefaultInvocationMatcherFactory;26import org.powermock.api.easymock.internal.expectation.DefaultMethodUnexpectedInvocationErrorFactory;27import org.powermock.api.easymock.internal.expectation.DefaultUnexpectedInvocationErrorFactory;28import org.powermock.api.easymock.internal.expectation.Expectation;29import org.powermock.api.easymock.internal.expectation.ExpectationBuilder;30import org.powermock.api.easymock.internal.expectation.Invocation;31import org.powermock.api.easymock.internal.expectation.InvocationBuilder;32import org.powermock.api.easymock.internal.expectationPowerMock
Using AI Code Generation
1package org.powermock.examples.tutorial.easymock;2import static org.easymock.EasyMock.*;3import static org.junit.Assert.*;4import static org.powermock.api.easymock.PowerMock.*;5import org.junit.Test;6public class Example4Test {7	public void testPrivateMethod() throws Exception {8		Example4 example4 = createMock(Example4.class);9		expectPrivate(example4, "privateMethod", "Hello", "World").andReturn("Hello World");10		replayAll();11		String returnValue = example4.publicMethod("Hello", "World");12		verifyAll();13		assertEquals("Hello World", returnValue);14	}15}16package org.powermock.examples.tutorial.easymock;17import static org.easymock.EasyMock.*;18import static org.junit.Assert.*;19import static org.powermock.api.easymock.PowerMock.*;20import org.junit.Test;21public class Example5Test {22	public void testPrivateMethod() throws Exception {23		Example5 example5 = createMock(Example5.class);24		expectPrivate(example5, "privateMethod", "Hello", "World").andReturn("Hello World");25		replayAll();26		String returnValue = example5.publicMethod("Hello", "World");27		verifyAll();28		assertEquals("Hello World", returnValue);29	}30}31package org.powermock.examples.tutorial.easymock;32import static org.easymock.EasyMock.*;33import static org.junit.Assert.*;34import static org.powermock.api.easymock.PowerMock.*;35import org.junit.Test;36public class Example6Test {37	public void testPrivateMethod() throws Exception {38		Example6 example6 = createMock(Example6.class);39		expectPrivate(example6, "privateMethod", "PowerMock
Using AI Code Generation
1import org.powermock.api.easymock.PowerMock;2import org.junit.Test;3import org.junit.runner.RunWith;4import org.powermock.modules.junit4.PowerMockRunner;5@RunWith(PowerMockRunner.class)6public class TestPowerMock {7public void testPowerMock() {8PowerMock.expectNew(String.class, "test").andReturn("test");9PowerMock.replay(String.class);10String str = new String("test");11assertEquals("test", str);12PowerMock.verify(String.class);13}14}15C:\Users\kumar\workspace\demo>javac -cp C:\Users\kumar\workspace\demo\lib\junit-4.0.jar;C:\Users\kumar\workspace\demo\lib\powermock-api-easymock-1.5.5.jar;C:\Users\kumar\workspace\demo\lib\powermock-core-1.5.5.jar;C:\Users\kumar\workspace\demo\lib\powermock-module-junit4-1.5.5.jar;C:\Users\kumar\workspace\demo\lib\objenesis-1.2.jar;C:\Users\kumar\workspace\demo\lib\cglib-nodep-2.2.2.jar;C:\Users\kumar\workspace\demo\lib\asm-3.3.1.jar 4.java16C:\Users\kumar\workspace\demo>java -cp C:\Users\kumar\workspace\demo;C:\Users\kumar\workspace\demo\lib\junit-4.0.jar;C:\Users\kumar\workspace\demo\lib\powermock-api-easymock-1.5.5.jar;C:\Users\kumar\workspace\demo\lib\powermock-core-1.5.5.jar;C:\Users\kumar\workspace\demo\lib\powermock-module-junit4-1.5.5.jar;C:\Users\kumar\workspace\demo\lib\objenesis-1.2.jar;C:\Users\kumar\workspace\demo\lib\cglib-nodep-2.2.2.jar;C:\Users\kumar\workspace\demo\lib\asm-3.3.1.jar org.junit.runner.JUnitCore TestPowerMockPowerMock
Using AI Code Generation
1import static org.powermock.api.easymock.PowerMock.*;2import org.easymock.EasyMock;3import org.junit.Test;4public class PowerMockExample {5    public void test() {6        MyDependency dependency = createMock(MyDependency.class);7        expect(dependency.greet()).andReturn("Hello world!");8        replay(dependency);9        MyClass tester = new MyClass();10        tester.setDependency(dependency);11        assertEquals("Hello world!", tester.greetDependency());12        verify(dependency);13    }14}15import static org.powermock.api.mockito.PowerMockito.*;16import org.junit.Test;17import org.mockito.Mockito;18public class PowerMockExample {19    public void test() {20        MyDependency dependency = Mockito.mock(MyDependency.class);21        when(dependency.greet()).thenReturn("Hello world!");22        MyClass tester = new MyClass();23        tester.setDependency(dependency);24        assertEquals("Hello world!", tester.greetDependency());25        verify(dependency).greet();26    }27}28import static org.powermock.api.support.membermodification.MemberMatcher.*;29import static org.powermock.api.support.membermodification.MemberModifier.*;30import java.lang.reflect.Method;31import org.junit.Test;32public class PowerMockExample {33    public void test() {34        MyDependency dependency = new MyDependency();35        stub(method(MyDependency.class, "greet")).toReturn("Hello world!");36        MyClass tester = new MyClass();37        tester.setDependency(dependency);38        assertEquals("Hello world!", tester.greetDependency());39        Method method = findMethod(MyDependency.class, "greet");40        assertTrue("Method greet() was not called", method.wasCalled());41    }42}PowerMock
Using AI Code Generation
1import static org.powermock.api.easymock.PowerMock.*;2import org.easymock.EasyMock;3import org.junit.Test;4import static org.junit.Assert.*;5public class PowerMockTest {6    public void testMockStatic() {7        ClassToMock mock = createMock(ClassToMock.class);8        expectStatic(ClassToMock.class, "staticMethod", EasyMock.anyObject(), EasyMock.anyObject());9        expectLastCall().andReturn(10);10        replayAll();11        int result = ClassToMock.staticMethod(1, 2);12        assertEquals(10, result);13        verifyAll();14    }15}16import static org.powermock.api.easymock.PowerMock.*;17import org.easymock.EasyMock;18import org.junit.Test;19import static org.junit.Assert.*;20public class PowerMockTest {21    public void testMockConstructor() throws Exception {22        ClassToMock mock = createMock(ClassToMock.class);23        expectNew(ClassToMock.class, EasyMock.anyObject(), EasyMock.anyObject());24        expectLastCall().andReturn(mock);25        replayAll();26        ClassToMock result = new ClassToMock(1, 2);27        verifyAll();28    }29}30import static org.powermock.api.easymock.PowerMock.*;31import org.easymock.EasyMock;32import org.junit.Test;33import static org.junit.Assert.*;34public class PowerMockTest {35    public void testMockPrivateMethod() throws Exception {PowerMock
Using AI Code Generation
1package com.powermock;2import static org.easymock.EasyMock.expect;3import static org.powermock.api.easymock.PowerMock.expectNew;4import static org.powermock.api.easymock.PowerMock.replay;5import static org.powermock.api.easymock.PowerMock.verifyAll;6import static org.powermock.api.easymock.PowerMock.verifyNoMoreInteractions;7import org.easymock.EasyMock;8import org.junit.Test;9import org.junit.runner.RunWith;10import org.powermock.api.easymock.annotation.Mock;11import org.powermock.api.easymock.annotation.MockNice;12import org.powermock.api.easymock.annotation.MockStrict;13import org.powermock.core.classloader.annotations.PrepareForTest;14import org.powermock.modules.junit4.PowerMockRunner;15@RunWith(PowerMockRunner.class)16@PrepareForTest({ 4.class })17public class 4Test {18    private 4 mock1;19    private 4 mock2;20    private 4 mock3;21    public void test1() throws Exception {22        expectNew(4.class).andReturn(mock1);23        replay(4.class);24        4.newInstance();25        verifyAll();26        verifyNoMoreInteractions(mock1);27    }28    public void test2() throws Exception {29        expectNew(4.class).andReturn(mock2);30        replay(4.class);31        4.newInstance();32        verifyAll();33        verifyNoMoreInteractions(mock2);34    }35    public void test3() throws Exception {36        expectNew(4.class).andReturn(mock3);37        replay(4.class);38        4.newInstance();39        verifyAll();40        verifyNoMoreInteractions(mock3);41    }42}Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
