import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.Terminated;
import akka.event.Logging;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
-
+import scala.concurrent.duration.FiniteDuration;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
public class RaftActorTest extends AbstractActorTest {
public static class MockRaftActor extends RaftActor {
- private boolean applySnapshotCalled = false;
- private List<Object> state;
+ public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+ private final Map<String, String> peerAddresses;
+ private final String id;
+ private final Optional<ConfigParams> config;
- public MockRaftActor(String id,
- Map<String, String> peerAddresses) {
- super(id, peerAddresses);
- state = new ArrayList<>();
+ private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+ Optional<ConfigParams> config) {
+ this.peerAddresses = peerAddresses;
+ this.id = id;
+ this.config = config;
+ }
+
+ @Override
+ public MockRaftActor create() throws Exception {
+ return new MockRaftActor(id, peerAddresses, config);
+ }
}
- public RaftActorContext getRaftActorContext() {
- return context;
+ private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ private final List<Object> state;
+
+ public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+ super(id, peerAddresses, config);
+ state = new ArrayList<>();
}
- public boolean isApplySnapshotCalled() {
- return applySnapshotCalled;
+ public void waitForRecoveryComplete() {
+ try {
+ assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
public List<Object> getState() {
return state;
}
- public static Props props(final String id, final Map<String, String> peerAddresses){
- return Props.create(new Creator<MockRaftActor>(){
-
- @Override public MockRaftActor create() throws Exception {
- return new MockRaftActor(id, peerAddresses);
- }
- });
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
}
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+ }
+
+ @Override
+ protected void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ protected void appendRecoveredLogEntry(Payload data) {
state.add(data);
}
- @Override protected void createSnapshot() {
- throw new UnsupportedOperationException("createSnapshot");
+ @Override
+ protected void applyCurrentLogRecoveryBatch() {
}
- @Override protected void applySnapshot(ByteString snapshot) {
- applySnapshotCalled = true;
+ @Override
+ protected void onRecoveryComplete() {
+ recoveryComplete.countDown();
+ }
+
+ @Override
+ protected void applyRecoverySnapshot(ByteString snapshot) {
try {
Object data = toObject(snapshot);
+ System.out.println("!!!!!applyRecoverySnapshot: "+data);
if (data instanceof List) {
state.addAll((List) data);
}
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
+ @Override protected void createSnapshot() {
+ throw new UnsupportedOperationException("createSnapshot");
+ }
+
+ @Override protected void applySnapshot(ByteString snapshot) {
+ }
+
@Override protected void onStateChanged() {
}
public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
super(actorSystem);
- raftActor = this.getSystem()
- .actorOf(MockRaftActor.props(actorName,
- Collections.EMPTY_MAP), actorName);
+ raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
}
return
new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
) {
+ @Override
protected Boolean run() {
return true;
}
}
public void findLeader(final String expectedLeader){
+ raftActor.tell(new FindLeader(), getRef());
-
- new Within(duration("1 seconds")) {
- protected void run() {
-
- raftActor.tell(new FindLeader(), getRef());
-
- String s = new ExpectMsg<String>(duration("1 seconds"),
- "findLeader") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- if (in instanceof FindLeaderReply) {
- return ((FindLeaderReply) in).getLeaderActor();
- } else {
- throw noMatch();
- }
- }
- }.get();// this extracts the received message
-
- assertEquals(expectedLeader, s);
-
- }
-
-
- };
+ FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
}
public ActorRef getRaftActor() {
return raftActor;
}
-
}
}
@Test
- public void testRaftActorRecovery() {
+ public void testRaftActorRecovery() throws Exception {
new JavaTestKit(getSystem()) {{
- new Within(duration("1 seconds")) {
- protected void run() {
-
- String persistenceId = "follower10";
-
- ActorRef followerActor = getSystem().actorOf(
- MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
-
- List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
- ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
- snapshotUnappliedEntries.add(entry1);
-
- int lastAppliedDuringSnapshotCapture = 3;
- int lastIndexDuringSnapshotCapture = 4;
-
- ByteString snapshotBytes = null;
- try {
- // 4 messages as part of snapshot, which are applied to state
- snapshotBytes = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
- } catch (Exception e) {
- e.printStackTrace();
- }
- Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
- snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
- lastAppliedDuringSnapshotCapture, 1);
- MockSnapshotStore.setMockSnapshot(snapshot);
- MockSnapshotStore.setPersistenceId(persistenceId);
-
- // add more entries after snapshot is taken
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
- ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
- ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
- entries.add(entry2);
- entries.add(entry3);
- entries.add(entry4);
-
- int lastAppliedToState = 5;
- int lastIndex = 7;
-
- MockAkkaJournal.addToJournal(5, entry2);
- // 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
- MockAkkaJournal.addToJournal(7, entry3);
- MockAkkaJournal.addToJournal(8, entry4);
-
- // kill the actor
- followerActor.tell(PoisonPill.getInstance(), null);
-
- try {
- // give some time for actor to die
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- //reinstate the actor
- TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
- MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
-
- try {
- //give some time for snapshot offer to get called.
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- RaftActorContext context = ref.underlyingActor().getRaftActorContext();
- assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
- assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
- assertEquals(lastAppliedToState, context.getLastApplied());
- assertEquals(lastAppliedToState, context.getCommitIndex());
- assertTrue(ref.underlyingActor().isApplySnapshotCalled());
- assertEquals(6, ref.underlyingActor().getState().size());
- }
- };
+ String persistenceId = "follower10";
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ // Set the heartbeat interval high to essentially disable election otherwise the test
+ // may fail if the actor is switched to Leader and the commitIndex is set to the last
+ // log entry.
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
+ Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+ watch(followerActor);
+
+ List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+ new MockRaftActorContext.MockPayload("E"));
+ snapshotUnappliedEntries.add(entry1);
+
+ int lastAppliedDuringSnapshotCapture = 3;
+ int lastIndexDuringSnapshotCapture = 4;
+
+ // 4 messages as part of snapshot, which are applied to state
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+
+ Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+ snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+ lastAppliedDuringSnapshotCapture, 1);
+ MockSnapshotStore.setMockSnapshot(snapshot);
+ MockSnapshotStore.setPersistenceId(persistenceId);
+
+ // add more entries after snapshot is taken
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("F"));
+ ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("G"));
+ ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("H"));
+ entries.add(entry2);
+ entries.add(entry3);
+ entries.add(entry4);
+
+ int lastAppliedToState = 5;
+ int lastIndex = 7;
+
+ MockAkkaJournal.addToJournal(5, entry2);
+ // 2 entries are applied to state besides the 4 entries in snapshot
+ MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+ MockAkkaJournal.addToJournal(7, entry3);
+ MockAkkaJournal.addToJournal(8, entry4);
+
+ // kill the actor
+ followerActor.tell(PoisonPill.getInstance(), null);
+ expectMsgClass(duration("5 seconds"), Terminated.class);
+
+ unwatch(followerActor);
+
+ //reinstate the actor
+ TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
+ Optional.<ConfigParams>of(config)));
+
+ ref.underlyingActor().waitForRecoveryComplete();
+
+ RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+ assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
+ context.getReplicatedLog().size());
+ assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+ assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+ assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
}};
-
}
private ByteString fromObject(Object snapshot) throws Exception {