Add unit tests for RaftActorRecoverySupport 12/17312/6
authorTom Pantelis <tpanteli@brocade.com>
Sat, 28 Mar 2015 15:27:06 +0000 (11:27 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 10 Apr 2015 16:28:25 +0000 (12:28 -0400)
Also removed redundant recovery tests from RaftActorTest and added a
test that verifies the RaftActor forwards all recovery messages to the
RaftActorRecoverySupport.

I also refactored MockRactorActor and RaftActorTestKit to their own
files to reduce the size of RaftActorTest.

Change-Id: I75d0a1cfe0d0b168e488ce5cecda29dae4fe4321
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java [new file with mode: 0644]

index 41a807aa355d394f659f488209f65e732646b120..030ab303393f108d98a52230e5d8de3bcc4ddb6c 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -152,8 +153,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     @Override
     public void handleRecover(Object message) {
         if(raftRecovery == null) {
-            raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
-                    getRaftActorRecoveryCohort());
+            raftRecovery = newRaftActorRecoverySupport();
         }
 
         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
@@ -175,6 +175,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
+        return new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+                getRaftActorRecoveryCohort());
+    }
+
     protected void initializeBehavior(){
         changeCurrentBehavior(new Follower(context));
     }
index 3c6c8281fb734b0d378963c831c2f462fbcc3182..1289ed7f905f23919c32068a91fe0c29db0f3214 100644 (file)
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActorTest.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -184,7 +183,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected void waitUntilLeader(ActorRef actorRef) {
-        RaftActorTest.RaftActorTestKit.waitUntilLeader(actorRef);
+        RaftActorTestKit.waitUntilLeader(actorRef);
     }
 
     protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
new file mode 100644 (file)
index 0000000..3275e8d
--- /dev/null
@@ -0,0 +1,245 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
+
+    final RaftActor actorDelegate;
+    final RaftActorRecoveryCohort recoveryCohortDelegate;
+    final RaftActorSnapshotCohort snapshotCohortDelegate;
+    private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+    private final List<Object> state;
+    private ActorRef roleChangeNotifier;
+    private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
+    private RaftActorRecoverySupport raftActorRecoverySupport;
+
+    public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+        private static final long serialVersionUID = 1L;
+        private final Map<String, String> peerAddresses;
+        private final String id;
+        private final Optional<ConfigParams> config;
+        private final DataPersistenceProvider dataPersistenceProvider;
+        private final ActorRef roleChangeNotifier;
+
+        private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+            Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+            ActorRef roleChangeNotifier) {
+            this.peerAddresses = peerAddresses;
+            this.id = id;
+            this.config = config;
+            this.dataPersistenceProvider = dataPersistenceProvider;
+            this.roleChangeNotifier = roleChangeNotifier;
+        }
+
+        @Override
+        public MockRaftActor create() throws Exception {
+            MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+                dataPersistenceProvider);
+            mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+            return mockRaftActor;
+        }
+    }
+
+    public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                         DataPersistenceProvider dataPersistenceProvider) {
+        super(id, peerAddresses, config);
+        state = new ArrayList<>();
+        this.actorDelegate = mock(RaftActor.class);
+        this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+        this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+        if(dataPersistenceProvider == null){
+            setPersistence(true);
+        } else {
+            setPersistence(dataPersistenceProvider);
+        }
+    }
+
+    public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
+        raftActorRecoverySupport = support;
+    }
+
+    @Override
+    public RaftActorRecoverySupport newRaftActorRecoverySupport() {
+        return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
+    }
+
+    public void waitForRecoveryComplete() {
+        try {
+            assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void waitForInitializeBehaviorComplete() {
+        try {
+            assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    public void waitUntilLeader(){
+        for(int i = 0;i < 10; i++){
+            if(isLeader()){
+                break;
+            }
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public List<Object> getState() {
+        return state;
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+                              Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+        Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+                              Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+                              DataPersistenceProvider dataPersistenceProvider){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+    }
+
+
+    @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+        actorDelegate.applyState(clientActor, identifier, data);
+        LOG.info("{}: applyState called", persistenceId());
+    }
+
+    @Override
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return this;
+    }
+
+    @Override
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return this;
+    }
+
+    @Override
+    public void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    public void appendRecoveredLogEntry(Payload data) {
+        state.add(data);
+    }
+
+    @Override
+    public void applyCurrentLogRecoveryBatch() {
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+        actorDelegate.onRecoveryComplete();
+        recoveryComplete.countDown();
+    }
+
+    @Override
+    protected void initializeBehavior() {
+        super.initializeBehavior();
+        initializeBehaviorComplete.countDown();
+    }
+
+    @Override
+    public void applyRecoverySnapshot(byte[] bytes) {
+        recoveryCohortDelegate.applyRecoverySnapshot(bytes);
+        try {
+            Object data = toObject(bytes);
+            if (data instanceof List) {
+                state.addAll((List<?>) data);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
+        LOG.info("{}: createSnapshot called", persistenceId());
+        snapshotCohortDelegate.createSnapshot(actorRef);
+    }
+
+    @Override
+    public void applySnapshot(byte [] snapshot) {
+        LOG.info("{}: applySnapshot called", persistenceId());
+        snapshotCohortDelegate.applySnapshot(snapshot);
+    }
+
+    @Override
+    protected void onStateChanged() {
+        actorDelegate.onStateChanged();
+    }
+
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return Optional.fromNullable(roleChangeNotifier);
+    }
+
+    @Override public String persistenceId() {
+        return this.getId();
+    }
+
+    private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs);
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
+    }
+
+    public ReplicatedLog getReplicatedLog(){
+        return this.getRaftActorContext().getReplicatedLog();
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
new file mode 100644 (file)
index 0000000..c30de5e
--- /dev/null
@@ -0,0 +1,293 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotOffer;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for RaftActorRecoverySupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorRecoverySupportTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Mock
+    private RaftActorBehavior mockBehavior;
+
+    @Mock
+    private RaftActorRecoveryCohort mockCohort;
+
+    private RaftActorRecoverySupport support;
+
+    private RaftActorContext context;
+    private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
+                -1, -1, Collections.<String,String>emptyMap(), configParams, LOG);
+
+        support = new RaftActorRecoverySupport(mockPersistence, context , mockBehavior, mockCohort);
+
+        doReturn(true).when(mockPersistence).isRecoveryApplicable();
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+    }
+
+    private void sendMessageToSupport(Object message) {
+        sendMessageToSupport(message, false);
+    }
+
+    private void sendMessageToSupport(Object message, boolean expComplete) {
+        boolean complete = support.handleRecoveryMessage(message);
+        assertEquals("complete", expComplete, complete);
+    }
+
+    @Test
+    public void testOnReplicatedLogEntry() {
+        MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1", 5));
+
+        sendMessageToSupport(logEntry);
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+    }
+
+    @Test
+    public void testOnApplyJournalEntries() {
+        configParams.setJournalRecoveryLogBatchSize(5);
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                3, new MockRaftActorContext.MockPayload("3")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5")));
+
+        sendMessageToSupport(new ApplyJournalEntries(2));
+
+        assertEquals("Last applied", 2, context.getLastApplied());
+        assertEquals("Commit index", 2, context.getCommitIndex());
+
+        sendMessageToSupport(new ApplyJournalEntries(4));
+
+        assertEquals("Last applied", 4, context.getLastApplied());
+        assertEquals("Last applied", 4, context.getLastApplied());
+
+        sendMessageToSupport(new ApplyJournalEntries(5));
+
+        assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", 5, context.getLastApplied());
+        assertEquals("Commit index", 5, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+
+        InOrder inOrder = Mockito.inOrder(mockCohort);
+        inOrder.verify(mockCohort).startLogRecoveryBatch(5);
+
+        for(int i = 0; i < replicatedLog.size() - 1; i++) {
+            inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
+        }
+
+        inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
+        inOrder.verify(mockCohort).startLogRecoveryBatch(5);
+        inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
+
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void testOnApplyLogEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+
+        sendMessageToSupport(new ApplyLogEntries(0));
+
+        assertEquals("Last applied", 0, context.getLastApplied());
+        assertEquals("Commit index", 0, context.getCommitIndex());
+    }
+
+    @Test
+    public void testOnSnapshotOffer() {
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                3, new MockRaftActorContext.MockPayload("3")));
+
+        byte[] snapshotBytes = {1,2,3,4,5};
+
+        ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4", 4));
+
+        ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5", 5));
+
+        int lastAppliedDuringSnapshotCapture = 3;
+        int lastIndexDuringSnapshotCapture = 5;
+
+        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+
+        SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        assertEquals("Journal log size", 2, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+        assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
+        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+
+        verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
+    }
+
+    @Test
+    public void testOnRecoveryCompletedWithRemainingBatch() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+
+        sendMessageToSupport(new ApplyJournalEntries(1));
+
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        assertEquals("Last applied", 1, context.getLastApplied());
+        assertEquals("Commit index", 1, context.getCommitIndex());
+
+        InOrder inOrder = Mockito.inOrder(mockCohort);
+        inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
+
+        for(int i = 0; i < replicatedLog.size(); i++) {
+            inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
+        }
+
+        inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
+
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void testOnRecoveryCompletedWithNoRemainingBatch() {
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        verifyNoMoreInteractions(mockCohort);
+    }
+
+    @Test
+    public void testOnDeleteEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+
+        sendMessageToSupport(new DeleteEntries(1));
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testUpdateElectionTerm() {
+
+        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
+        assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
+    }
+
+    @Test
+    public void testRecoveryWithPersistenceDisabled() {
+        doReturn(false).when(mockPersistence).isRecoveryApplicable();
+
+        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4")));
+        sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5")));
+
+        sendMessageToSupport(new ApplyJournalEntries(4));
+
+        sendMessageToSupport(new DeleteEntries(5));
+
+        assertEquals("Journal log size", 0, context.getReplicatedLog().size());
+        assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+
+        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
+        assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted For", null, context.getTermInformation().getVotedFor());
+
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        verifyNoMoreInteractions(mockCohort);
+    }
+}
index 14bfd1d348b69dc76332fc35ec8f8f94dd80e8db..bc6257b67d50c85b6c6cb2eb6e165d87e216ebbb 100644 (file)
@@ -2,26 +2,22 @@ package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
-import akka.japi.Creator;
 import akka.japi.Procedure;
-import akka.pattern.Patterns;
-import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
@@ -29,16 +25,11 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,10 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -59,6 +47,7 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -69,17 +58,12 @@ import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
@@ -98,275 +82,6 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
-
-        private final RaftActor actorDelegate;
-        private final RaftActorRecoveryCohort recoveryCohortDelegate;
-        private final RaftActorSnapshotCohort snapshotCohortDelegate;
-        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-        private final List<Object> state;
-        private ActorRef roleChangeNotifier;
-        private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
-
-        public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
-            private static final long serialVersionUID = 1L;
-            private final Map<String, String> peerAddresses;
-            private final String id;
-            private final Optional<ConfigParams> config;
-            private final DataPersistenceProvider dataPersistenceProvider;
-            private final ActorRef roleChangeNotifier;
-
-            private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
-                ActorRef roleChangeNotifier) {
-                this.peerAddresses = peerAddresses;
-                this.id = id;
-                this.config = config;
-                this.dataPersistenceProvider = dataPersistenceProvider;
-                this.roleChangeNotifier = roleChangeNotifier;
-            }
-
-            @Override
-            public MockRaftActor create() throws Exception {
-                MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
-                    dataPersistenceProvider);
-                mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
-                return mockRaftActor;
-            }
-        }
-
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
-                             DataPersistenceProvider dataPersistenceProvider) {
-            super(id, peerAddresses, config);
-            state = new ArrayList<>();
-            this.actorDelegate = mock(RaftActor.class);
-            this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
-            this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
-            if(dataPersistenceProvider == null){
-                setPersistence(true);
-            } else {
-                setPersistence(dataPersistenceProvider);
-            }
-        }
-
-        public void waitForRecoveryComplete() {
-            try {
-                assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-        public void waitForInitializeBehaviorComplete() {
-            try {
-                assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-
-        public void waitUntilLeader(){
-            for(int i = 0;i < 10; i++){
-                if(isLeader()){
-                    break;
-                }
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-            }
-        }
-
-        public List<Object> getState() {
-            return state;
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                                  Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-            Optional<ConfigParams> config, ActorRef roleChangeNotifier){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                                  Optional<ConfigParams> config, ActorRef roleChangeNotifier,
-                                  DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
-        }
-
-
-        @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
-            actorDelegate.applyState(clientActor, identifier, data);
-            LOG.info("{}: applyState called", persistenceId());
-        }
-
-        @Override
-        @Nonnull
-        protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
-            return this;
-        }
-
-        @Override
-        protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
-            return this;
-        }
-
-        @Override
-        public void startLogRecoveryBatch(int maxBatchSize) {
-        }
-
-        @Override
-        public void appendRecoveredLogEntry(Payload data) {
-            state.add(data);
-        }
-
-        @Override
-        public void applyCurrentLogRecoveryBatch() {
-        }
-
-        @Override
-        protected void onRecoveryComplete() {
-            actorDelegate.onRecoveryComplete();
-            recoveryComplete.countDown();
-        }
-
-        @Override
-        protected void initializeBehavior() {
-            super.initializeBehavior();
-            initializeBehaviorComplete.countDown();
-        }
-
-        @Override
-        public void applyRecoverySnapshot(byte[] bytes) {
-            recoveryCohortDelegate.applyRecoverySnapshot(bytes);
-            try {
-                Object data = toObject(bytes);
-                if (data instanceof List) {
-                    state.addAll((List<?>) data);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void createSnapshot(ActorRef actorRef) {
-            LOG.info("{}: createSnapshot called", persistenceId());
-            snapshotCohortDelegate.createSnapshot(actorRef);
-        }
-
-        @Override
-        public void applySnapshot(byte [] snapshot) {
-            LOG.info("{}: applySnapshot called", persistenceId());
-            snapshotCohortDelegate.applySnapshot(snapshot);
-        }
-
-        @Override
-        protected void onStateChanged() {
-            actorDelegate.onStateChanged();
-        }
-
-        @Override
-        protected Optional<ActorRef> getRoleChangeNotifier() {
-            return Optional.fromNullable(roleChangeNotifier);
-        }
-
-        @Override public String persistenceId() {
-            return this.getId();
-        }
-
-        private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
-            Object obj = null;
-            ByteArrayInputStream bis = null;
-            ObjectInputStream ois = null;
-            try {
-                bis = new ByteArrayInputStream(bs);
-                ois = new ObjectInputStream(bis);
-                obj = ois.readObject();
-            } finally {
-                if (bis != null) {
-                    bis.close();
-                }
-                if (ois != null) {
-                    ois.close();
-                }
-            }
-            return obj;
-        }
-
-        public ReplicatedLog getReplicatedLog(){
-            return this.getRaftActorContext().getReplicatedLog();
-        }
-    }
-
-
-    public static class RaftActorTestKit extends JavaTestKit {
-        private final ActorRef raftActor;
-
-        public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
-            super(actorSystem);
-
-            raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
-                    Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
-
-        }
-
-
-        public ActorRef getRaftActor() {
-            return raftActor;
-        }
-
-        public boolean waitForLogMessage(final Class<?> logEventClass, String message){
-            // Wait for a specific log message to show up
-            return
-                new JavaTestKit.EventFilter<Boolean>(logEventClass
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(raftActor.path().toString())
-                    .message(message)
-                    .occurrences(1).exec();
-
-
-        }
-
-        protected void waitUntilLeader(){
-            waitUntilLeader(raftActor);
-        }
-
-        public static void waitUntilLeader(ActorRef actorRef) {
-            FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
-            for(int i = 0; i < 20 * 5; i++) {
-                Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
-                try {
-                    FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
-                    if(resp.getLeaderActor() != null) {
-                        return;
-                    }
-                } catch(TimeoutException e) {
-                } catch(Exception e) {
-                    System.err.println("FindLeader threw ex");
-                    e.printStackTrace();
-                }
-
-
-                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-            }
-
-            Assert.fail("Leader not found for actorRef " + actorRef.path());
-        }
-
-    }
-
-
     @Test
     public void testConstruction() {
         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
@@ -379,18 +94,20 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRaftActorRecovery() throws Exception {
+    public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
         new JavaTestKit(getSystem()) {{
             String persistenceId = factory.generateActorId("follower-");
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
             // Set the heartbeat interval high to essentially disable election otherwise the test
             // may fail if the actor is switched to Leader and the commitIndex is set to the last
             // log entry.
             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
+                    Optional.<ConfigParams>of(config)), persistenceId);
 
             watch(followerActor);
 
@@ -446,192 +163,95 @@ public class RaftActorTest extends AbstractActorTest {
                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
                             Optional.<ConfigParams>of(config)));
 
-            ref.underlyingActor().waitForRecoveryComplete();
+            MockRaftActor mockRaftActor = ref.underlyingActor();
 
-            RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+            mockRaftActor.waitForRecoveryComplete();
+
+            RaftActorContext context = mockRaftActor.getRaftActorContext();
             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
                     context.getReplicatedLog().size());
             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
-            assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
+            assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
+
+            mockRaftActor.waitForInitializeBehaviorComplete();
+
+            assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
     }
 
     @Test
-    public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+    public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
         new JavaTestKit(getSystem()) {{
-            String persistenceId = factory.generateActorId("leader-");
+            String persistenceId = factory.generateActorId("follower-");
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-            // Setup the persisted journal with some entries
-            ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
-                    new MockRaftActorContext.MockPayload("zero"));
-            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
-                    new MockRaftActorContext.MockPayload("oen"));
-            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
-                    new MockRaftActorContext.MockPayload("two"));
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-            long seqNr = 1;
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
-            InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
+            TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
+                    Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
 
-            int lastAppliedToState = 1;
-            int lastIndex = 2;
+            MockRaftActor mockRaftActor = ref.underlyingActor();
 
-            //reinstate the actor
-            TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                            Optional.<ConfigParams>of(config)));
+            mockRaftActor.waitForRecoveryComplete();
 
-            leaderActor.underlyingActor().waitForRecoveryComplete();
+            mockRaftActor.waitForInitializeBehaviorComplete();
 
-            RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
-            assertEquals("Journal log size", 3, context.getReplicatedLog().size());
-            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
-            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
-            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+            assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
     }
 
-    /**
-     * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
-     * process recovery messages
-     *
-     * @throws Exception
-     */
-
     @Test
-    public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+    public void testRaftActorForwardsToRaftActorRecoverySupport() {
+        String persistenceId = factory.generateActorId("leader-");
 
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                // Wait for akka's recovery to complete so it doesn't interfere.
-                mockRaftActor.waitForRecoveryComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
 
-                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
 
-                verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+        MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+        // Wait for akka's recovery to complete so it doesn't interfere.
+        mockRaftActor.waitForRecoveryComplete();
 
-                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+        RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
+        mockRaftActor.setRaftActorRecoverySupport(mockSupport );
 
-                assertEquals("add replicated log entry", 1, replicatedLog.size());
+        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
+        mockRaftActor.handleRecover(snapshotOffer);
 
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+        MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1", 5));
+        mockRaftActor.handleRecover(logEntry);
 
-                assertEquals("add replicated log entry", 2, replicatedLog.size());
+        ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
+        mockRaftActor.handleRecover(applyJournalEntries);
 
-                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
+        ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
+        mockRaftActor.handleRecover(applyLogEntries);
 
-                assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
+        DeleteEntries deleteEntries = new DeleteEntries(1);
+        mockRaftActor.handleRecover(deleteEntries);
 
-                // The snapshot had 4 items + we added 2 more items during the test
-                // We start removing from 5 and we should get 1 item in the replicated log
-                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
+        UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
+        mockRaftActor.handleRecover(updateElectionTerm);
 
-                assertEquals("remove log entries", 1, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
-
-                assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
-                assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
-
-                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-
-            }};
+        verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
+        verify(mockSupport).handleRecoveryMessage(same(logEntry));
+        verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
+        verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
+        verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
+        verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
     }
 
-    /**
-     * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
-     * not process recovery messages
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                // Wait for akka's recovery to complete so it doesn't interfere.
-                mockRaftActor.waitForRecoveryComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
-
-                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
-
-                verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
-
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
-
-                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
-
-                assertEquals("add replicated log entry", 0, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
-
-                assertEquals("add replicated log entry", 0, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
-
-                assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
-
-                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
-
-                assertEquals("remove log entries", 0, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
-
-                assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
-                assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
-
-                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-            }};
-    }
-
-
     @Test
     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
         new JavaTestKit(getSystem()) {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java
new file mode 100644 (file)
index 0000000..3e747e3
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Assert;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+public class RaftActorTestKit extends JavaTestKit {
+    private final ActorRef raftActor;
+
+    public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
+        super(actorSystem);
+
+        raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+                Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
+
+    }
+
+
+    public ActorRef getRaftActor() {
+        return raftActor;
+    }
+
+    public boolean waitForLogMessage(final Class<?> logEventClass, String message){
+        // Wait for a specific log message to show up
+        return
+            new JavaTestKit.EventFilter<Boolean>(logEventClass
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(raftActor.path().toString())
+                .message(message)
+                .occurrences(1).exec();
+
+
+    }
+
+    protected void waitUntilLeader(){
+        waitUntilLeader(raftActor);
+    }
+
+    public static void waitUntilLeader(ActorRef actorRef) {
+        FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+        for(int i = 0; i < 20 * 5; i++) {
+            Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
+            try {
+                FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
+                if(resp.getLeaderActor() != null) {
+                    return;
+                }
+            } catch(TimeoutException e) {
+            } catch(Exception e) {
+                System.err.println("FindLeader threw ex");
+                e.printStackTrace();
+            }
+
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Leader not found for actorRef " + actorRef.path());
+    }
+
+}
\ No newline at end of file