Bug-1903:On recovery all replicated log entries should not be applied to state
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.event.Logging;
8 import akka.japi.Creator;
9 import akka.testkit.JavaTestKit;
10 import akka.testkit.TestActorRef;
11 import com.google.protobuf.ByteString;
12 import org.junit.After;
13 import org.junit.Test;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
15 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
16 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
17 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
18 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.ObjectInputStream;
24 import java.io.ObjectOutputStream;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30
31 import static junit.framework.Assert.assertTrue;
32 import static junit.framework.TestCase.assertEquals;
33
34 public class RaftActorTest extends AbstractActorTest {
35
36
37     @After
38     public void tearDown() {
39         MockAkkaJournal.clearJournal();
40         MockSnapshotStore.setMockSnapshot(null);
41     }
42
43     public static class MockRaftActor extends RaftActor {
44
45         private boolean applySnapshotCalled = false;
46         private List<Object> state;
47
48         public MockRaftActor(String id,
49             Map<String, String> peerAddresses) {
50             super(id, peerAddresses);
51             state = new ArrayList<>();
52         }
53
54         public RaftActorContext getRaftActorContext() {
55             return context;
56         }
57
58         public boolean isApplySnapshotCalled() {
59             return applySnapshotCalled;
60         }
61
62         public List<Object> getState() {
63             return state;
64         }
65
66         public static Props props(final String id, final Map<String, String> peerAddresses){
67             return Props.create(new Creator<MockRaftActor>(){
68
69                 @Override public MockRaftActor create() throws Exception {
70                     return new MockRaftActor(id, peerAddresses);
71                 }
72             });
73         }
74
75         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
76             state.add(data);
77         }
78
79         @Override protected void createSnapshot() {
80             throw new UnsupportedOperationException("createSnapshot");
81         }
82
83         @Override protected void applySnapshot(ByteString snapshot) {
84             applySnapshotCalled = true;
85             try {
86                 Object data = toObject(snapshot);
87                 if (data instanceof List) {
88                     state.addAll((List) data);
89                 }
90             } catch (ClassNotFoundException e) {
91                 e.printStackTrace();
92             } catch (IOException e) {
93                 e.printStackTrace();
94             }
95         }
96
97         @Override protected void onStateChanged() {
98         }
99
100         @Override public String persistenceId() {
101             return this.getId();
102         }
103
104         private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
105             Object obj = null;
106             ByteArrayInputStream bis = null;
107             ObjectInputStream ois = null;
108             try {
109                 bis = new ByteArrayInputStream(bs.toByteArray());
110                 ois = new ObjectInputStream(bis);
111                 obj = ois.readObject();
112             } finally {
113                 if (bis != null) {
114                     bis.close();
115                 }
116                 if (ois != null) {
117                     ois.close();
118                 }
119             }
120             return obj;
121         }
122
123
124     }
125
126
127     private static class RaftActorTestKit extends JavaTestKit {
128         private final ActorRef raftActor;
129
130         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
131             super(actorSystem);
132
133             raftActor = this.getSystem()
134                 .actorOf(MockRaftActor.props(actorName,
135                     Collections.EMPTY_MAP), actorName);
136
137         }
138
139
140         public boolean waitForStartup(){
141             // Wait for a specific log message to show up
142             return
143                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
144                 ) {
145                     protected Boolean run() {
146                         return true;
147                     }
148                 }.from(raftActor.path().toString())
149                     .message("Switching from state Candidate to Leader")
150                     .occurrences(1).exec();
151
152
153         }
154
155         public void findLeader(final String expectedLeader){
156
157
158             new Within(duration("1 seconds")) {
159                 protected void run() {
160
161                     raftActor.tell(new FindLeader(), getRef());
162
163                     String s = new ExpectMsg<String>(duration("1 seconds"),
164                         "findLeader") {
165                         // do not put code outside this method, will run afterwards
166                         protected String match(Object in) {
167                             if (in instanceof FindLeaderReply) {
168                                 return ((FindLeaderReply) in).getLeaderActor();
169                             } else {
170                                 throw noMatch();
171                             }
172                         }
173                     }.get();// this extracts the received message
174
175                     assertEquals(expectedLeader, s);
176
177                 }
178
179
180             };
181         }
182
183         public ActorRef getRaftActor() {
184             return raftActor;
185         }
186
187     }
188
189
190     @Test
191     public void testConstruction() {
192         boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
193         assertEquals(true, started);
194     }
195
196     @Test
197     public void testFindLeaderWhenLeaderIsSelf(){
198         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
199         kit.waitForStartup();
200         kit.findLeader(kit.getRaftActor().path().toString());
201     }
202
203     @Test
204     public void testRaftActorRecovery() {
205         new JavaTestKit(getSystem()) {{
206             new Within(duration("1 seconds")) {
207                 protected void run() {
208
209                     String persistenceId = "follower10";
210
211                     ActorRef followerActor = getSystem().actorOf(
212                         MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
213
214                     List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
215                     ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
216                     snapshotUnappliedEntries.add(entry1);
217
218                     int lastAppliedDuringSnapshotCapture = 3;
219                     int lastIndexDuringSnapshotCapture = 4;
220
221                     ByteString snapshotBytes = null;
222                     try {
223                         // 4 messages as part of snapshot, which are applied to state
224                         snapshotBytes  = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
225                             new MockRaftActorContext.MockPayload("B"),
226                             new MockRaftActorContext.MockPayload("C"),
227                             new MockRaftActorContext.MockPayload("D")));
228                     } catch (Exception e) {
229                         e.printStackTrace();
230                     }
231                     Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
232                         snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
233                         lastAppliedDuringSnapshotCapture, 1);
234                     MockSnapshotStore.setMockSnapshot(snapshot);
235                     MockSnapshotStore.setPersistenceId(persistenceId);
236
237                     // add more entries after snapshot is taken
238                     List<ReplicatedLogEntry> entries = new ArrayList<>();
239                     ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
240                     ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
241                     ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
242                     entries.add(entry2);
243                     entries.add(entry3);
244                     entries.add(entry4);
245
246                     int lastAppliedToState = 5;
247                     int lastIndex = 7;
248
249                     MockAkkaJournal.addToJournal(5, entry2);
250                     // 2 entries are applied to state besides the 4 entries in snapshot
251                     MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
252                     MockAkkaJournal.addToJournal(7, entry3);
253                     MockAkkaJournal.addToJournal(8, entry4);
254
255                     // kill the actor
256                     followerActor.tell(PoisonPill.getInstance(), null);
257
258                     try {
259                         // give some time for actor to die
260                         Thread.sleep(200);
261                     } catch (InterruptedException e) {
262                         e.printStackTrace();
263                     }
264
265                     //reinstate the actor
266                     TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
267                         MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
268
269                     try {
270                         //give some time for snapshot offer to get called.
271                         Thread.sleep(200);
272                     } catch (InterruptedException e) {
273                         e.printStackTrace();
274                     }
275
276                     RaftActorContext context = ref.underlyingActor().getRaftActorContext();
277                     assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
278                     assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
279                     assertEquals(lastAppliedToState, context.getLastApplied());
280                     assertEquals(lastAppliedToState, context.getCommitIndex());
281                     assertTrue(ref.underlyingActor().isApplySnapshotCalled());
282                     assertEquals(6, ref.underlyingActor().getState().size());
283                 }
284             };
285         }};
286
287     }
288
289     private ByteString fromObject(Object snapshot) throws Exception {
290         ByteArrayOutputStream b = null;
291         ObjectOutputStream o = null;
292         try {
293             b = new ByteArrayOutputStream();
294             o = new ObjectOutputStream(b);
295             o.writeObject(snapshot);
296             byte[] snapshotBytes = b.toByteArray();
297             return ByteString.copyFrom(snapshotBytes);
298         } finally {
299             if (o != null) {
300                 o.flush();
301                 o.close();
302             }
303             if (b != null) {
304                 b.close();
305             }
306         }
307     }
308 }