From: Kamal Rameshan Date: Mon, 15 Sep 2014 05:48:31 +0000 (-0700) Subject: Bug-1829:Commit index of follower not changed after Snapshot applied on recovery. X-Git-Tag: release/helium~63^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=4c975594eb74127ca4c0018984d569ae52be77e5;hp=7225f60c394a26143f8421b0f99f2585699fa306 Bug-1829:Commit index of follower not changed after Snapshot applied on recovery. Change-Id: Id2e30f6756e5d71886ddb8ab3f3b095f03352b0a Signed-off-by: Kamal Rameshan --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 91bbeeca50..8270f2949a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -96,7 +96,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ - private RaftActorContext context; + protected RaftActorContext context; /** * The in-memory journal @@ -134,6 +134,7 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setReplicatedLog(replicatedLog); context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); LOG.info("Applied snapshot to replicatedLog. " + "snapshotIndex={}, snapshotTerm={}, journal-size={}", @@ -152,21 +153,22 @@ public abstract class RaftActor extends UntypedPersistentActor { applyState(null, "recovery", logEntry.getData()); context.setLastApplied(logEntry.getIndex()); context.setCommitIndex(logEntry.getIndex()); + } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { - if(LOG.isDebugEnabled()) { - LOG.debug( - "RecoveryCompleted - Switching actor to Follower - " + - "Persistence Id = " + persistenceId() + - " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + - "journal-size={}", - replicatedLog.lastIndex(), replicatedLog.snapshotIndex, - replicatedLog.snapshotTerm, replicatedLog.size() - ); - } + LOG.info( + "RecoveryCompleted - Switching actor to Follower - " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "journal-size={}", + replicatedLog.lastIndex(), replicatedLog.snapshotIndex, + replicatedLog.snapshotTerm, replicatedLog.size()); currentBehavior = switchBehavior(RaftState.Follower); onStateChanged(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 12123db129..9b099c2aba 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -2,18 +2,24 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.event.Logging; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; import com.google.protobuf.ByteString; import org.junit.Test; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; +import static junit.framework.Assert.assertTrue; import static junit.framework.TestCase.assertEquals; public class RaftActorTest extends AbstractActorTest { @@ -21,11 +27,21 @@ public class RaftActorTest extends AbstractActorTest { public static class MockRaftActor extends RaftActor { + boolean applySnapshotCalled = false; + public MockRaftActor(String id, Map peerAddresses) { super(id, peerAddresses); } + public RaftActorContext getRaftActorContext() { + return context; + } + + public boolean isApplySnapshotCalled() { + return applySnapshotCalled; + } + public static Props props(final String id, final Map peerAddresses){ return Props.create(new Creator(){ @@ -45,7 +61,7 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void applySnapshot(ByteString snapshot) { - throw new UnsupportedOperationException("applySnapshot"); + applySnapshotCalled = true; } @Override protected void onStateChanged() { @@ -134,5 +150,56 @@ public class RaftActorTest extends AbstractActorTest { kit.findLeader(kit.getRaftActor().path().toString()); } + @Test + public void testActorRecovery() { + new JavaTestKit(getSystem()) {{ + new Within(duration("1 seconds")) { + protected void run() { + + String persistenceId = "follower10"; + + ActorRef followerActor = getSystem().actorOf( + MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId); + + + List entries = new ArrayList<>(); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E")); + ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); + entries.add(entry1); + entries.add(entry2); + + int lastApplied = 3; + int lastIndex = 5; + Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1); + MockSnapshotStore.setMockSnapshot(snapshot); + MockSnapshotStore.setPersistenceId(persistenceId); + + followerActor.tell(PoisonPill.getInstance(), null); + try { + // give some time for actor to die + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + TestActorRef ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP)); + try { + //give some time for snapshot offer to get called. + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + RaftActorContext context = ref.underlyingActor().getRaftActorContext(); + assertEquals(entries.size(), context.getReplicatedLog().size()); + assertEquals(lastApplied, context.getLastApplied()); + assertEquals(lastApplied, context.getCommitIndex()); + assertTrue(ref.underlyingActor().isApplySnapshotCalled()); + } + + }; + }}; + + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java new file mode 100644 index 0000000000..d70bf920ae --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.utils; + +import akka.dispatch.Futures; +import akka.japi.Option; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.snapshot.japi.SnapshotStore; +import org.opendaylight.controller.cluster.raft.Snapshot; +import scala.concurrent.Future; + + +public class MockSnapshotStore extends SnapshotStore { + + private static Snapshot mockSnapshot; + private static String persistenceId; + + public static void setMockSnapshot(Snapshot s) { + mockSnapshot = s; + } + + public static void setPersistenceId(String pId) { + persistenceId = pId; + } + + @Override + public Future> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) { + if (mockSnapshot == null) { + return Futures.successful(Option.none()); + } + + SnapshotMetadata smd = new SnapshotMetadata(persistenceId, 1, 12345); + SelectedSnapshot selectedSnapshot = + new SelectedSnapshot(smd, mockSnapshot); + return Futures.successful(Option.some(selectedSnapshot)); + } + + @Override + public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { + return null; + } + + @Override + public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { + + } + + @Override + public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { + + } + + @Override + public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) throws Exception { + + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf index 2b753004c4..6b2cc22038 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf @@ -1,4 +1,6 @@ akka { + persistence.snapshot-store.plugin = "mock-snapshot-store" + loglevel = "DEBUG" loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] @@ -19,3 +21,10 @@ akka { } } } + +mock-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +}