Merge "Bug-1829:Commit index of follower not changed after Snapshot applied on recovery."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.event.Logging;
8 import akka.japi.Creator;
9 import akka.testkit.JavaTestKit;
10 import akka.testkit.TestActorRef;
11 import com.google.protobuf.ByteString;
12 import org.junit.Test;
13 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
14 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
15 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
16
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Map;
21
22 import static junit.framework.Assert.assertTrue;
23 import static junit.framework.TestCase.assertEquals;
24
25 public class RaftActorTest extends AbstractActorTest {
26
27
28     public static class MockRaftActor extends RaftActor {
29
30         boolean applySnapshotCalled = false;
31
32         public MockRaftActor(String id,
33             Map<String, String> peerAddresses) {
34             super(id, peerAddresses);
35         }
36
37         public RaftActorContext getRaftActorContext() {
38             return context;
39         }
40
41         public boolean isApplySnapshotCalled() {
42             return applySnapshotCalled;
43         }
44
45         public static Props props(final String id, final Map<String, String> peerAddresses){
46             return Props.create(new Creator<MockRaftActor>(){
47
48                 @Override public MockRaftActor create() throws Exception {
49                     return new MockRaftActor(id, peerAddresses);
50                 }
51             });
52         }
53
54         @Override protected void applyState(ActorRef clientActor,
55             String identifier,
56             Object data) {
57         }
58
59         @Override protected void createSnapshot() {
60             throw new UnsupportedOperationException("createSnapshot");
61         }
62
63         @Override protected void applySnapshot(ByteString snapshot) {
64            applySnapshotCalled = true;
65         }
66
67         @Override protected void onStateChanged() {
68         }
69
70         @Override public String persistenceId() {
71             return this.getId();
72         }
73
74     }
75
76
77     private static class RaftActorTestKit extends JavaTestKit {
78         private final ActorRef raftActor;
79
80         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
81             super(actorSystem);
82
83             raftActor = this.getSystem()
84                 .actorOf(MockRaftActor.props(actorName,
85                     Collections.EMPTY_MAP), actorName);
86
87         }
88
89
90         public boolean waitForStartup(){
91             // Wait for a specific log message to show up
92             return
93                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
94                 ) {
95                     protected Boolean run() {
96                         return true;
97                     }
98                 }.from(raftActor.path().toString())
99                     .message("Switching from state Candidate to Leader")
100                     .occurrences(1).exec();
101
102
103         }
104
105         public void findLeader(final String expectedLeader){
106
107
108             new Within(duration("1 seconds")) {
109                 protected void run() {
110
111                     raftActor.tell(new FindLeader(), getRef());
112
113                     String s = new ExpectMsg<String>(duration("1 seconds"),
114                         "findLeader") {
115                         // do not put code outside this method, will run afterwards
116                         protected String match(Object in) {
117                             if (in instanceof FindLeaderReply) {
118                                 return ((FindLeaderReply) in).getLeaderActor();
119                             } else {
120                                 throw noMatch();
121                             }
122                         }
123                     }.get();// this extracts the received message
124
125                     assertEquals(expectedLeader, s);
126
127                 }
128
129
130             };
131         }
132
133         public ActorRef getRaftActor() {
134             return raftActor;
135         }
136
137     }
138
139
140     @Test
141     public void testConstruction() {
142         boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
143         assertEquals(true, started);
144     }
145
146     @Test
147     public void testFindLeaderWhenLeaderIsSelf(){
148         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
149         kit.waitForStartup();
150         kit.findLeader(kit.getRaftActor().path().toString());
151     }
152
153     @Test
154     public void testActorRecovery() {
155         new JavaTestKit(getSystem()) {{
156             new Within(duration("1 seconds")) {
157                 protected void run() {
158
159                     String persistenceId = "follower10";
160
161                     ActorRef followerActor = getSystem().actorOf(
162                         MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
163
164
165                     List<ReplicatedLogEntry> entries = new ArrayList<>();
166                     ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
167                     ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
168                     entries.add(entry1);
169                     entries.add(entry2);
170
171                     int lastApplied = 3;
172                     int lastIndex = 5;
173                     Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
174                     MockSnapshotStore.setMockSnapshot(snapshot);
175                     MockSnapshotStore.setPersistenceId(persistenceId);
176
177                     followerActor.tell(PoisonPill.getInstance(), null);
178                     try {
179                         // give some time for actor to die
180                         Thread.sleep(200);
181                     } catch (InterruptedException e) {
182                         e.printStackTrace();
183                     }
184
185                     TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
186                     try {
187                         //give some time for snapshot offer to get called.
188                         Thread.sleep(200);
189                     } catch (InterruptedException e) {
190                         e.printStackTrace();
191                     }
192                     RaftActorContext context = ref.underlyingActor().getRaftActorContext();
193                     assertEquals(entries.size(), context.getReplicatedLog().size());
194                     assertEquals(lastApplied, context.getLastApplied());
195                     assertEquals(lastApplied, context.getCommitIndex());
196                     assertTrue(ref.underlyingActor().isApplySnapshotCalled());
197                 }
198
199             };
200         }};
201
202     }
203
204
205 }