1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.event.Logging;
8 import akka.japi.Creator;
9 import akka.testkit.JavaTestKit;
10 import akka.testkit.TestActorRef;
11 import com.google.protobuf.ByteString;
12 import org.junit.Test;
13 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
14 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
15 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.List;
22 import static junit.framework.Assert.assertTrue;
23 import static junit.framework.TestCase.assertEquals;
25 public class RaftActorTest extends AbstractActorTest {
28 public static class MockRaftActor extends RaftActor {
30 boolean applySnapshotCalled = false;
32 public MockRaftActor(String id,
33 Map<String, String> peerAddresses) {
34 super(id, peerAddresses);
37 public RaftActorContext getRaftActorContext() {
41 public boolean isApplySnapshotCalled() {
42 return applySnapshotCalled;
45 public static Props props(final String id, final Map<String, String> peerAddresses){
46 return Props.create(new Creator<MockRaftActor>(){
48 @Override public MockRaftActor create() throws Exception {
49 return new MockRaftActor(id, peerAddresses);
54 @Override protected void applyState(ActorRef clientActor,
59 @Override protected void createSnapshot() {
60 throw new UnsupportedOperationException("createSnapshot");
63 @Override protected void applySnapshot(ByteString snapshot) {
64 applySnapshotCalled = true;
67 @Override protected void onStateChanged() {
70 @Override public String persistenceId() {
77 private static class RaftActorTestKit extends JavaTestKit {
78 private final ActorRef raftActor;
80 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
83 raftActor = this.getSystem()
84 .actorOf(MockRaftActor.props(actorName,
85 Collections.EMPTY_MAP), actorName);
90 public boolean waitForStartup(){
91 // Wait for a specific log message to show up
93 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
95 protected Boolean run() {
98 }.from(raftActor.path().toString())
99 .message("Switching from state Candidate to Leader")
100 .occurrences(1).exec();
105 public void findLeader(final String expectedLeader){
108 new Within(duration("1 seconds")) {
109 protected void run() {
111 raftActor.tell(new FindLeader(), getRef());
113 String s = new ExpectMsg<String>(duration("1 seconds"),
115 // do not put code outside this method, will run afterwards
116 protected String match(Object in) {
117 if (in instanceof FindLeaderReply) {
118 return ((FindLeaderReply) in).getLeaderActor();
123 }.get();// this extracts the received message
125 assertEquals(expectedLeader, s);
133 public ActorRef getRaftActor() {
141 public void testConstruction() {
142 boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
143 assertEquals(true, started);
147 public void testFindLeaderWhenLeaderIsSelf(){
148 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
149 kit.waitForStartup();
150 kit.findLeader(kit.getRaftActor().path().toString());
154 public void testActorRecovery() {
155 new JavaTestKit(getSystem()) {{
156 new Within(duration("1 seconds")) {
157 protected void run() {
159 String persistenceId = "follower10";
161 ActorRef followerActor = getSystem().actorOf(
162 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
165 List<ReplicatedLogEntry> entries = new ArrayList<>();
166 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
167 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
173 Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
174 MockSnapshotStore.setMockSnapshot(snapshot);
175 MockSnapshotStore.setPersistenceId(persistenceId);
177 followerActor.tell(PoisonPill.getInstance(), null);
179 // give some time for actor to die
181 } catch (InterruptedException e) {
185 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
187 //give some time for snapshot offer to get called.
189 } catch (InterruptedException e) {
192 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
193 assertEquals(entries.size(), context.getReplicatedLog().size());
194 assertEquals(lastApplied, context.getLastApplied());
195 assertEquals(lastApplied, context.getCommitIndex());
196 assertTrue(ref.underlyingActor().isApplySnapshotCalled());