* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
*/
- private RaftActorContext context;
+ protected RaftActorContext context;
/**
* The in-memory journal
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
LOG.info("Applied snapshot to replicatedLog. " +
"snapshotIndex={}, snapshotTerm={}, journal-size={}",
applyState(null, "recovery", logEntry.getData());
context.setLastApplied(logEntry.getIndex());
context.setCommitIndex(logEntry.getIndex());
+
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
} else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+
} else if (message instanceof RecoveryCompleted) {
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "RecoveryCompleted - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size()
- );
- }
+ LOG.info(
+ "RecoveryCompleted - Switching actor to Follower - " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size());
currentBehavior = switchBehavior(RaftState.Follower);
onStateChanged();
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.event.Logging;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
import com.google.protobuf.ByteString;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import static junit.framework.Assert.assertTrue;
import static junit.framework.TestCase.assertEquals;
public class RaftActorTest extends AbstractActorTest {
public static class MockRaftActor extends RaftActor {
+ boolean applySnapshotCalled = false;
+
public MockRaftActor(String id,
Map<String, String> peerAddresses) {
super(id, peerAddresses);
}
+ public RaftActorContext getRaftActorContext() {
+ return context;
+ }
+
+ public boolean isApplySnapshotCalled() {
+ return applySnapshotCalled;
+ }
+
public static Props props(final String id, final Map<String, String> peerAddresses){
return Props.create(new Creator<MockRaftActor>(){
}
@Override protected void applySnapshot(ByteString snapshot) {
- throw new UnsupportedOperationException("applySnapshot");
+ applySnapshotCalled = true;
}
@Override protected void onStateChanged() {
kit.findLeader(kit.getRaftActor().path().toString());
}
+ @Test
+ public void testActorRecovery() {
+ new JavaTestKit(getSystem()) {{
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ String persistenceId = "follower10";
+
+ ActorRef followerActor = getSystem().actorOf(
+ MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
+
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
+ entries.add(entry1);
+ entries.add(entry2);
+
+ int lastApplied = 3;
+ int lastIndex = 5;
+ Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
+ MockSnapshotStore.setMockSnapshot(snapshot);
+ MockSnapshotStore.setPersistenceId(persistenceId);
+
+ followerActor.tell(PoisonPill.getInstance(), null);
+ try {
+ // give some time for actor to die
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
+ try {
+ //give some time for snapshot offer to get called.
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+ assertEquals(entries.size(), context.getReplicatedLog().size());
+ assertEquals(lastApplied, context.getLastApplied());
+ assertEquals(lastApplied, context.getCommitIndex());
+ assertTrue(ref.underlyingActor().isApplySnapshotCalled());
+ }
+
+ };
+ }};
+
+ }
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import scala.concurrent.Future;
+
+
+public class MockSnapshotStore extends SnapshotStore {
+
+ private static Snapshot mockSnapshot;
+ private static String persistenceId;
+
+ public static void setMockSnapshot(Snapshot s) {
+ mockSnapshot = s;
+ }
+
+ public static void setPersistenceId(String pId) {
+ persistenceId = pId;
+ }
+
+ @Override
+ public Future<Option<SelectedSnapshot>> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) {
+ if (mockSnapshot == null) {
+ return Futures.successful(Option.<SelectedSnapshot>none());
+ }
+
+ SnapshotMetadata smd = new SnapshotMetadata(persistenceId, 1, 12345);
+ SelectedSnapshot selectedSnapshot =
+ new SelectedSnapshot(smd, mockSnapshot);
+ return Futures.successful(Option.some(selectedSnapshot));
+ }
+
+ @Override
+ public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+ return null;
+ }
+
+ @Override
+ public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+
+ }
+
+ @Override
+ public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+
+ }
+
+ @Override
+ public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) throws Exception {
+
+ }
+}
akka {
+ persistence.snapshot-store.plugin = "mock-snapshot-store"
+
loglevel = "DEBUG"
loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
}
}
}
+
+mock-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}