Bug 1831 Batch messages on journal recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index 998c198756d191df038de6f6cc75a8091fd0d1f1..22f374319cfdbca12115fad320949c7b277a45a5 100644 (file)
@@ -4,19 +4,22 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
-
+import scala.concurrent.duration.FiniteDuration;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -27,9 +30,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
 
 public class RaftActorTest extends AbstractActorTest {
 
@@ -42,58 +45,90 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
-        private boolean applySnapshotCalled = false;
-        private List<Object> state;
+        public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+            private final Map<String, String> peerAddresses;
+            private final String id;
+            private final Optional<ConfigParams> config;
 
-        public MockRaftActor(String id,
-            Map<String, String> peerAddresses) {
-            super(id, peerAddresses);
-            state = new ArrayList<>();
+            private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+                    Optional<ConfigParams> config) {
+                this.peerAddresses = peerAddresses;
+                this.id = id;
+                this.config = config;
+            }
+
+            @Override
+            public MockRaftActor create() throws Exception {
+                return new MockRaftActor(id, peerAddresses, config);
+            }
         }
 
-        public RaftActorContext getRaftActorContext() {
-            return context;
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final List<Object> state;
+
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+            super(id, peerAddresses, config);
+            state = new ArrayList<>();
         }
 
-        public boolean isApplySnapshotCalled() {
-            return applySnapshotCalled;
+        public void waitForRecoveryComplete() {
+            try {
+                assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
         }
 
         public List<Object> getState() {
             return state;
         }
 
-        public static Props props(final String id, final Map<String, String> peerAddresses){
-            return Props.create(new Creator<MockRaftActor>(){
-
-                @Override public MockRaftActor create() throws Exception {
-                    return new MockRaftActor(id, peerAddresses);
-                }
-            });
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                Optional<ConfigParams> config){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
         }
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+        }
+
+        @Override
+        protected void startLogRecoveryBatch(int maxBatchSize) {
+        }
+
+        @Override
+        protected void appendRecoveredLogEntry(Payload data) {
             state.add(data);
         }
 
-        @Override protected void createSnapshot() {
-            throw new UnsupportedOperationException("createSnapshot");
+        @Override
+        protected void applyCurrentLogRecoveryBatch() {
         }
 
-        @Override protected void applySnapshot(ByteString snapshot) {
-            applySnapshotCalled = true;
+        @Override
+        protected void onRecoveryComplete() {
+            recoveryComplete.countDown();
+        }
+
+        @Override
+        protected void applyRecoverySnapshot(ByteString snapshot) {
             try {
                 Object data = toObject(snapshot);
+                System.out.println("!!!!!applyRecoverySnapshot: "+data);
                 if (data instanceof List) {
                     state.addAll((List) data);
                 }
-            } catch (ClassNotFoundException e) {
-                e.printStackTrace();
-            } catch (IOException e) {
+            } catch (Exception e) {
                 e.printStackTrace();
             }
         }
 
+        @Override protected void createSnapshot() {
+            throw new UnsupportedOperationException("createSnapshot");
+        }
+
+        @Override protected void applySnapshot(ByteString snapshot) {
+        }
+
         @Override protected void onStateChanged() {
         }
 
@@ -130,9 +165,8 @@ public class RaftActorTest extends AbstractActorTest {
         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
             super(actorSystem);
 
-            raftActor = this.getSystem()
-                .actorOf(MockRaftActor.props(actorName,
-                    Collections.EMPTY_MAP), actorName);
+            raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
 
         }
 
@@ -142,6 +176,7 @@ public class RaftActorTest extends AbstractActorTest {
             return
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -153,37 +188,15 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         public void findLeader(final String expectedLeader){
+            raftActor.tell(new FindLeader(), getRef());
 
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    raftActor.tell(new FindLeader(), getRef());
-
-                    String s = new ExpectMsg<String>(duration("1 seconds"),
-                        "findLeader") {
-                        // do not put code outside this method, will run afterwards
-                        protected String match(Object in) {
-                            if (in instanceof FindLeaderReply) {
-                                return ((FindLeaderReply) in).getLeaderActor();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();// this extracts the received message
-
-                    assertEquals(expectedLeader, s);
-
-                }
-
-
-            };
+            FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
         }
 
         public ActorRef getRaftActor() {
             return raftActor;
         }
-
     }
 
 
@@ -201,89 +214,84 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRaftActorRecovery() {
+    public void testRaftActorRecovery() throws Exception {
         new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    String persistenceId = "follower10";
-
-                    ActorRef followerActor = getSystem().actorOf(
-                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
-
-                    List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
-                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
-                    snapshotUnappliedEntries.add(entry1);
-
-                    int lastAppliedDuringSnapshotCapture = 3;
-                    int lastIndexDuringSnapshotCapture = 4;
-
-                    ByteString snapshotBytes = null;
-                    try {
-                        // 4 messages as part of snapshot, which are applied to state
-                        snapshotBytes  = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
-                            new MockRaftActorContext.MockPayload("B"),
-                            new MockRaftActorContext.MockPayload("C"),
-                            new MockRaftActorContext.MockPayload("D")));
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                    Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
-                        lastAppliedDuringSnapshotCapture, 1);
-                    MockSnapshotStore.setMockSnapshot(snapshot);
-                    MockSnapshotStore.setPersistenceId(persistenceId);
-
-                    // add more entries after snapshot is taken
-                    List<ReplicatedLogEntry> entries = new ArrayList<>();
-                    ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
-                    ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
-                    ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
-                    entries.add(entry2);
-                    entries.add(entry3);
-                    entries.add(entry4);
-
-                    int lastAppliedToState = 5;
-                    int lastIndex = 7;
-
-                    MockAkkaJournal.addToJournal(5, entry2);
-                    // 2 entries are applied to state besides the 4 entries in snapshot
-                    MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
-                    MockAkkaJournal.addToJournal(7, entry3);
-                    MockAkkaJournal.addToJournal(8, entry4);
-
-                    // kill the actor
-                    followerActor.tell(PoisonPill.getInstance(), null);
-
-                    try {
-                        // give some time for actor to die
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-
-                    //reinstate the actor
-                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
-                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
-
-                    try {
-                        //give some time for snapshot offer to get called.
-                        Thread.sleep(200);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-
-                    RaftActorContext context = ref.underlyingActor().getRaftActorContext();
-                    assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
-                    assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
-                    assertEquals(lastAppliedToState, context.getLastApplied());
-                    assertEquals(lastAppliedToState, context.getCommitIndex());
-                    assertTrue(ref.underlyingActor().isApplySnapshotCalled());
-                    assertEquals(6, ref.underlyingActor().getState().size());
-                }
-            };
+            String persistenceId = "follower10";
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            // Set the heartbeat interval high to essentially disable election otherwise the test
+            // may fail if the actor is switched to Leader and the commitIndex is set to the last
+            // log entry.
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+            ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
+                    Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+            watch(followerActor);
+
+            List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+                    new MockRaftActorContext.MockPayload("E"));
+            snapshotUnappliedEntries.add(entry1);
+
+            int lastAppliedDuringSnapshotCapture = 3;
+            int lastIndexDuringSnapshotCapture = 4;
+
+                // 4 messages as part of snapshot, which are applied to state
+            ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+            Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                    snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+                    lastAppliedDuringSnapshotCapture, 1);
+            MockSnapshotStore.setMockSnapshot(snapshot);
+            MockSnapshotStore.setPersistenceId(persistenceId);
+
+            // add more entries after snapshot is taken
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+                    new MockRaftActorContext.MockPayload("F"));
+            ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                    new MockRaftActorContext.MockPayload("G"));
+            ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+                    new MockRaftActorContext.MockPayload("H"));
+            entries.add(entry2);
+            entries.add(entry3);
+            entries.add(entry4);
+
+            int lastAppliedToState = 5;
+            int lastIndex = 7;
+
+            MockAkkaJournal.addToJournal(5, entry2);
+            // 2 entries are applied to state besides the 4 entries in snapshot
+            MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+            MockAkkaJournal.addToJournal(7, entry3);
+            MockAkkaJournal.addToJournal(8, entry4);
+
+            // kill the actor
+            followerActor.tell(PoisonPill.getInstance(), null);
+            expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            unwatch(followerActor);
+
+            //reinstate the actor
+            TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+                    MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
+                            Optional.<ConfigParams>of(config)));
+
+            ref.underlyingActor().waitForRecoveryComplete();
+
+            RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+            assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+                    context.getReplicatedLog().size());
+            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+            assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
         }};
-
     }
 
     private ByteString fromObject(Object snapshot) throws Exception {