Bug-1829:Commit index of follower not changed after Snapshot applied on recovery. 82/11182/3
authorKamal Rameshan <kramesha@cisco.com>
Mon, 15 Sep 2014 05:48:31 +0000 (22:48 -0700)
committerMoiz Raja <moraja@cisco.com>
Tue, 16 Sep 2014 10:31:05 +0000 (03:31 -0700)
Change-Id: Id2e30f6756e5d71886ddb8ab3f3b095f03352b0a
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf

index 91bbeeca504b607147e39927dae1481ee9c4df6e..8270f2949a67cc9fc00f5180dce41872ca6a8a47 100644 (file)
@@ -96,7 +96,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private RaftActorContext context;
+    protected RaftActorContext context;
 
     /**
      * The in-memory journal
 
     /**
      * The in-memory journal
@@ -134,6 +134,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
 
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
+            context.setCommitIndex(snapshot.getLastAppliedIndex());
 
             LOG.info("Applied snapshot to replicatedLog. " +
                     "snapshotIndex={}, snapshotTerm={}, journal-size={}",
 
             LOG.info("Applied snapshot to replicatedLog. " +
                     "snapshotIndex={}, snapshotTerm={}, journal-size={}",
@@ -152,21 +153,22 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applyState(null, "recovery", logEntry.getData());
             context.setLastApplied(logEntry.getIndex());
             context.setCommitIndex(logEntry.getIndex());
             applyState(null, "recovery", logEntry.getData());
             context.setLastApplied(logEntry.getIndex());
             context.setCommitIndex(logEntry.getIndex());
+
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
         } else if (message instanceof UpdateElectionTerm) {
         } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                ((UpdateElectionTerm) message).getVotedFor());
+
         } else if (message instanceof RecoveryCompleted) {
         } else if (message instanceof RecoveryCompleted) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "RecoveryCompleted - Switching actor to Follower - " +
-                        "Persistence Id =  " + persistenceId() +
-                        " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                        "journal-size={}",
-                    replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                    replicatedLog.snapshotTerm, replicatedLog.size()
-                );
-            }
+            LOG.info(
+                "RecoveryCompleted - Switching actor to Follower - " +
+                    "Persistence Id =  " + persistenceId() +
+                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "journal-size={}",
+                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+                replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
             onStateChanged();
         }
             currentBehavior = switchBehavior(RaftState.Follower);
             onStateChanged();
         }
index 12123db12995061901a39a264c79f0237d78d00a..9b099c2abac8223529b750c6ea906925105ec487 100644 (file)
@@ -2,18 +2,24 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import com.google.protobuf.ByteString;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import java.util.Map;
 
+import static junit.framework.Assert.assertTrue;
 import static junit.framework.TestCase.assertEquals;
 
 public class RaftActorTest extends AbstractActorTest {
 import static junit.framework.TestCase.assertEquals;
 
 public class RaftActorTest extends AbstractActorTest {
@@ -21,11 +27,21 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
 
     public static class MockRaftActor extends RaftActor {
 
+        boolean applySnapshotCalled = false;
+
         public MockRaftActor(String id,
             Map<String, String> peerAddresses) {
             super(id, peerAddresses);
         }
 
         public MockRaftActor(String id,
             Map<String, String> peerAddresses) {
             super(id, peerAddresses);
         }
 
+        public RaftActorContext getRaftActorContext() {
+            return context;
+        }
+
+        public boolean isApplySnapshotCalled() {
+            return applySnapshotCalled;
+        }
+
         public static Props props(final String id, final Map<String, String> peerAddresses){
             return Props.create(new Creator<MockRaftActor>(){
 
         public static Props props(final String id, final Map<String, String> peerAddresses){
             return Props.create(new Creator<MockRaftActor>(){
 
@@ -45,7 +61,7 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
-            throw new UnsupportedOperationException("applySnapshot");
+           applySnapshotCalled = true;
         }
 
         @Override protected void onStateChanged() {
         }
 
         @Override protected void onStateChanged() {
@@ -134,5 +150,56 @@ public class RaftActorTest extends AbstractActorTest {
         kit.findLeader(kit.getRaftActor().path().toString());
     }
 
         kit.findLeader(kit.getRaftActor().path().toString());
     }
 
+    @Test
+    public void testActorRecovery() {
+        new JavaTestKit(getSystem()) {{
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    String persistenceId = "follower10";
+
+                    ActorRef followerActor = getSystem().actorOf(
+                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
+
+
+                    List<ReplicatedLogEntry> entries = new ArrayList<>();
+                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
+                    ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
+                    entries.add(entry1);
+                    entries.add(entry2);
+
+                    int lastApplied = 3;
+                    int lastIndex = 5;
+                    Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
+                    MockSnapshotStore.setMockSnapshot(snapshot);
+                    MockSnapshotStore.setPersistenceId(persistenceId);
+
+                    followerActor.tell(PoisonPill.getInstance(), null);
+                    try {
+                        // give some time for actor to die
+                        Thread.sleep(200);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
+                    try {
+                        //give some time for snapshot offer to get called.
+                        Thread.sleep(200);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+                    assertEquals(entries.size(), context.getReplicatedLog().size());
+                    assertEquals(lastApplied, context.getLastApplied());
+                    assertEquals(lastApplied, context.getCommitIndex());
+                    assertTrue(ref.underlyingActor().isApplySnapshotCalled());
+                }
+
+            };
+        }};
+
+    }
+
 
 }
 
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java
new file mode 100644 (file)
index 0000000..d70bf92
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import scala.concurrent.Future;
+
+
+public class MockSnapshotStore  extends SnapshotStore {
+
+    private static Snapshot mockSnapshot;
+    private static String persistenceId;
+
+    public static void setMockSnapshot(Snapshot s) {
+        mockSnapshot = s;
+    }
+
+    public static void setPersistenceId(String pId) {
+        persistenceId = pId;
+    }
+
+    @Override
+    public Future<Option<SelectedSnapshot>> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) {
+        if (mockSnapshot == null) {
+            return Futures.successful(Option.<SelectedSnapshot>none());
+        }
+
+        SnapshotMetadata smd = new SnapshotMetadata(persistenceId, 1, 12345);
+        SelectedSnapshot selectedSnapshot =
+            new SelectedSnapshot(smd, mockSnapshot);
+        return Futures.successful(Option.some(selectedSnapshot));
+    }
+
+    @Override
+    public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+        return null;
+    }
+
+    @Override
+    public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+
+    }
+
+    @Override
+    public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+
+    }
+
+    @Override
+    public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) throws Exception {
+
+    }
+}
index 2b753004c48265620628fdbc58a1e41a96a65c51..6b2cc2203844198bc546ab509695d3c134a35a8b 100644 (file)
@@ -1,4 +1,6 @@
 akka {
 akka {
+    persistence.snapshot-store.plugin = "mock-snapshot-store"
+
     loglevel = "DEBUG"
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
     loglevel = "DEBUG"
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
@@ -19,3 +21,10 @@ akka {
         }
     }
 }
         }
     }
 }
+
+mock-snapshot-store {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}