Bug 3161: Create new UpdateElectionTerm class
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyObject;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.same;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.mock;
13 import static org.mockito.Mockito.times;
14 import static org.mockito.Mockito.verify;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Terminated;
19 import akka.japi.Procedure;
20 import akka.persistence.SaveSnapshotFailure;
21 import akka.persistence.SaveSnapshotSuccess;
22 import akka.persistence.SnapshotMetadata;
23 import akka.persistence.SnapshotOffer;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Optional;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import com.google.protobuf.ByteString;
30 import java.io.ByteArrayOutputStream;
31 import java.io.ObjectOutputStream;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.DataPersistenceProvider;
43 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
44 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
45 import org.opendaylight.controller.cluster.notifications.RoleChanged;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
55 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
56 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
57 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
59 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
60 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
61 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
62 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.duration.FiniteDuration;
66
67 public class RaftActorTest extends AbstractActorTest {
68
69     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
70
71     private TestActorFactory factory;
72
73     @Before
74     public void setUp(){
75         factory = new TestActorFactory(getSystem());
76     }
77
78     @After
79     public void tearDown() throws Exception {
80         factory.close();
81         InMemoryJournal.clear();
82         InMemorySnapshotStore.clear();
83     }
84
85     @Test
86     public void testConstruction() {
87         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
88     }
89
90     @Test
91     public void testFindLeaderWhenLeaderIsSelf(){
92         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
93         kit.waitUntilLeader();
94     }
95
96     @Test
97     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
98         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
99
100         new JavaTestKit(getSystem()) {{
101             String persistenceId = factory.generateActorId("follower-");
102
103             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
104
105             // Set the heartbeat interval high to essentially disable election otherwise the test
106             // may fail if the actor is switched to Leader and the commitIndex is set to the last
107             // log entry.
108             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
109
110             ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
111             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
112                     peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
113
114             watch(followerActor);
115
116             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
117             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
118                     new MockRaftActorContext.MockPayload("E"));
119             snapshotUnappliedEntries.add(entry1);
120
121             int lastAppliedDuringSnapshotCapture = 3;
122             int lastIndexDuringSnapshotCapture = 4;
123
124             // 4 messages as part of snapshot, which are applied to state
125             ByteString snapshotBytes = fromObject(Arrays.asList(
126                     new MockRaftActorContext.MockPayload("A"),
127                     new MockRaftActorContext.MockPayload("B"),
128                     new MockRaftActorContext.MockPayload("C"),
129                     new MockRaftActorContext.MockPayload("D")));
130
131             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
132                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
133                     lastAppliedDuringSnapshotCapture, 1);
134             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
135
136             // add more entries after snapshot is taken
137             List<ReplicatedLogEntry> entries = new ArrayList<>();
138             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
139                     new MockRaftActorContext.MockPayload("F", 2));
140             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
141                     new MockRaftActorContext.MockPayload("G", 3));
142             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
143                     new MockRaftActorContext.MockPayload("H", 4));
144             entries.add(entry2);
145             entries.add(entry3);
146             entries.add(entry4);
147
148             int lastAppliedToState = 5;
149             int lastIndex = 7;
150
151             InMemoryJournal.addEntry(persistenceId, 5, entry2);
152             // 2 entries are applied to state besides the 4 entries in snapshot
153             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
154             InMemoryJournal.addEntry(persistenceId, 7, entry3);
155             InMemoryJournal.addEntry(persistenceId, 8, entry4);
156
157             // kill the actor
158             followerActor.tell(PoisonPill.getInstance(), null);
159             expectMsgClass(duration("5 seconds"), Terminated.class);
160
161             unwatch(followerActor);
162
163             //reinstate the actor
164             TestActorRef<MockRaftActor> ref = factory.createTestActor(
165                     MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
166
167             MockRaftActor mockRaftActor = ref.underlyingActor();
168
169             mockRaftActor.waitForRecoveryComplete();
170
171             RaftActorContext context = mockRaftActor.getRaftActorContext();
172             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
173                     context.getReplicatedLog().size());
174             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
175             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
176             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
177             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
178             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
179
180             mockRaftActor.waitForInitializeBehaviorComplete();
181
182             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
183         }};
184
185         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
186     }
187
188     @Test
189     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
190         new JavaTestKit(getSystem()) {{
191             String persistenceId = factory.generateActorId("follower-");
192
193             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
194
195             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
196
197             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
198                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
199                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
200
201             MockRaftActor mockRaftActor = ref.underlyingActor();
202
203             mockRaftActor.waitForRecoveryComplete();
204
205             mockRaftActor.waitForInitializeBehaviorComplete();
206
207             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
208         }};
209     }
210
211     @Test
212     public void testRaftActorForwardsToRaftActorRecoverySupport() {
213         String persistenceId = factory.generateActorId("leader-");
214
215         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
216
217         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
218
219         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
220                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
221
222         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
223
224         // Wait for akka's recovery to complete so it doesn't interfere.
225         mockRaftActor.waitForRecoveryComplete();
226
227         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
228         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
229
230         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
231         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
232         mockRaftActor.handleRecover(snapshotOffer);
233
234         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
235                 1, new MockRaftActorContext.MockPayload("1", 5));
236         mockRaftActor.handleRecover(logEntry);
237
238         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
239         mockRaftActor.handleRecover(applyJournalEntries);
240
241         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
242         mockRaftActor.handleRecover(applyLogEntries);
243
244         DeleteEntries deleteEntries = new DeleteEntries(1);
245         mockRaftActor.handleRecover(deleteEntries);
246
247         org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
248                 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
249         mockRaftActor.handleRecover(deprecatedDeleteEntries);
250
251         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
252         mockRaftActor.handleRecover(updateElectionTerm);
253
254         org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
255                 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
256         mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
257
258         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
259         verify(mockSupport).handleRecoveryMessage(same(logEntry));
260         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
261         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
262         verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
263         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
264         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
265         verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm));
266     }
267
268     @Test
269     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
270         String persistenceId = factory.generateActorId("leader-");
271
272         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
273
274         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
275
276         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
277
278         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
279                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
280
281         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
282
283         // Wait for akka's recovery to complete so it doesn't interfere.
284         mockRaftActor.waitForRecoveryComplete();
285
286         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
287         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
288         mockRaftActor.handleCommand(applySnapshot);
289
290         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
291         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
292         mockRaftActor.handleCommand(captureSnapshot);
293
294         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
295         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
296         mockRaftActor.handleCommand(captureSnapshotReply);
297
298         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
299         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
300         mockRaftActor.handleCommand(saveSnapshotSuccess);
301
302         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
303         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
304         mockRaftActor.handleCommand(saveSnapshotFailure);
305
306         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
307         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
308
309         verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
310         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
311         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
312         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
313         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
314         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
315     }
316
317     @Test
318     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
319         new JavaTestKit(getSystem()) {
320             {
321                 String persistenceId = factory.generateActorId("leader-");
322
323                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
324
325                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
326
327                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
328
329                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
330                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
331
332                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
333
334                 mockRaftActor.waitForInitializeBehaviorComplete();
335
336                 mockRaftActor.waitUntilLeader();
337
338                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
339
340                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
341
342             }
343
344         };
345     }
346
347     @Test
348     public void testApplyState() throws Exception {
349
350         new JavaTestKit(getSystem()) {
351             {
352                 String persistenceId = factory.generateActorId("leader-");
353
354                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
355
356                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
357
358                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
359
360                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
361                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
362
363                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
364
365                 mockRaftActor.waitForInitializeBehaviorComplete();
366
367                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
368                         new MockRaftActorContext.MockPayload("F"));
369
370                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
371
372                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
373
374             }
375         };
376     }
377
378     @Test
379     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
380         new JavaTestKit(getSystem()) {{
381             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
382                     Props.create(MessageCollectorActor.class));
383             MessageCollectorActor.waitUntilReady(notifierActor);
384
385             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
386             long heartBeatInterval = 100;
387             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
388             config.setElectionTimeoutFactor(20);
389
390             String persistenceId = factory.generateActorId("notifier-");
391
392             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
393                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
394                     new NonPersistentDataProvider()), persistenceId);
395
396             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
397
398
399             // check if the notifier got a role change from null to Follower
400             RoleChanged raftRoleChanged = matches.get(0);
401             assertEquals(persistenceId, raftRoleChanged.getMemberId());
402             assertNull(raftRoleChanged.getOldRole());
403             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
404
405             // check if the notifier got a role change from Follower to Candidate
406             raftRoleChanged = matches.get(1);
407             assertEquals(persistenceId, raftRoleChanged.getMemberId());
408             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
409             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
410
411             // check if the notifier got a role change from Candidate to Leader
412             raftRoleChanged = matches.get(2);
413             assertEquals(persistenceId, raftRoleChanged.getMemberId());
414             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
415             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
416
417             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
418                     notifierActor, LeaderStateChanged.class);
419
420             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
421
422             notifierActor.underlyingActor().clear();
423
424             MockRaftActor raftActor = raftActorRef.underlyingActor();
425             final String newLeaderId = "new-leader";
426             Follower follower = new Follower(raftActor.getRaftActorContext()) {
427                 @Override
428                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
429                     leaderId = newLeaderId;
430                     return this;
431                 }
432             };
433
434             raftActor.changeCurrentBehavior(follower);
435
436             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
437             assertEquals(persistenceId, leaderStateChange.getMemberId());
438             assertEquals(null, leaderStateChange.getLeaderId());
439
440             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
441             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
442             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
443
444             notifierActor.underlyingActor().clear();
445
446             raftActor.handleCommand("any");
447
448             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
449             assertEquals(persistenceId, leaderStateChange.getMemberId());
450             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
451         }};
452     }
453
454     @Test
455     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
456         new JavaTestKit(getSystem()) {{
457             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
458             MessageCollectorActor.waitUntilReady(notifierActor);
459
460             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
461             long heartBeatInterval = 100;
462             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
463             config.setElectionTimeoutFactor(1);
464
465             String persistenceId = factory.generateActorId("notifier-");
466
467             factory.createActor(MockRaftActor.props(persistenceId,
468                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
469
470             List<RoleChanged> matches =  null;
471             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
472                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
473                 assertNotNull(matches);
474                 if(matches.size() == 3) {
475                     break;
476                 }
477                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
478             }
479
480             assertNotNull(matches);
481             assertEquals(2, matches.size());
482
483             // check if the notifier got a role change from null to Follower
484             RoleChanged raftRoleChanged = matches.get(0);
485             assertEquals(persistenceId, raftRoleChanged.getMemberId());
486             assertNull(raftRoleChanged.getOldRole());
487             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
488
489             // check if the notifier got a role change from Follower to Candidate
490             raftRoleChanged = matches.get(1);
491             assertEquals(persistenceId, raftRoleChanged.getMemberId());
492             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
493             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
494
495         }};
496     }
497
498     @Test
499     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
500         new JavaTestKit(getSystem()) {
501             {
502                 String persistenceId = factory.generateActorId("leader-");
503                 String follower1Id = factory.generateActorId("follower-");
504
505                 ActorRef followerActor1 =
506                         factory.createActor(Props.create(MessageCollectorActor.class));
507
508                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
509                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
510                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
511
512                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
513
514                 Map<String, String> peerAddresses = new HashMap<>();
515                 peerAddresses.put(follower1Id, followerActor1.path().toString());
516
517                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
518                         MockRaftActor.props(persistenceId, peerAddresses,
519                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
520
521                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
522
523                 leaderActor.getRaftActorContext().setCommitIndex(4);
524                 leaderActor.getRaftActorContext().setLastApplied(4);
525                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
526
527                 leaderActor.waitForInitializeBehaviorComplete();
528
529                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
530
531                 Leader leader = new Leader(leaderActor.getRaftActorContext());
532                 leaderActor.setCurrentBehavior(leader);
533                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
534
535                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
536                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
537
538                 assertEquals(8, leaderActor.getReplicatedLog().size());
539
540                 leaderActor.getRaftActorContext().getSnapshotManager()
541                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
542                                 new MockRaftActorContext.MockPayload("x")), 4);
543
544                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
545
546                 assertEquals(8, leaderActor.getReplicatedLog().size());
547
548                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
549                 //fake snapshot on index 5
550                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
551
552                 assertEquals(8, leaderActor.getReplicatedLog().size());
553
554                 //fake snapshot on index 6
555                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
556                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
557                 assertEquals(8, leaderActor.getReplicatedLog().size());
558
559                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
560
561                 assertEquals(8, leaderActor.getReplicatedLog().size());
562
563                 ByteString snapshotBytes = fromObject(Arrays.asList(
564                         new MockRaftActorContext.MockPayload("foo-0"),
565                         new MockRaftActorContext.MockPayload("foo-1"),
566                         new MockRaftActorContext.MockPayload("foo-2"),
567                         new MockRaftActorContext.MockPayload("foo-3"),
568                         new MockRaftActorContext.MockPayload("foo-4")));
569
570                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
571                         leader, Runtime.getRuntime().totalMemory());
572
573                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
574
575                 // The commit is needed to complete the snapshot creation process
576                 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
577
578                 // capture snapshot reply should remove the snapshotted entries only
579                 assertEquals(3, leaderActor.getReplicatedLog().size());
580                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
581
582                 // add another non-replicated entry
583                 leaderActor.getReplicatedLog().append(
584                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
585
586                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
587                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
588                 assertEquals(2, leaderActor.getReplicatedLog().size());
589                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
590
591             }
592         };
593     }
594
595     @Test
596     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
597         new JavaTestKit(getSystem()) {
598             {
599                 String persistenceId = factory.generateActorId("follower-");
600                 String leaderId = factory.generateActorId("leader-");
601
602
603                 ActorRef leaderActor1 =
604                         factory.createActor(Props.create(MessageCollectorActor.class));
605
606                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
607                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
608                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
609
610                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
611
612                 Map<String, String> peerAddresses = new HashMap<>();
613                 peerAddresses.put(leaderId, leaderActor1.path().toString());
614
615                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
616                         MockRaftActor.props(persistenceId, peerAddresses,
617                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
618
619                 MockRaftActor followerActor = mockActorRef.underlyingActor();
620                 followerActor.getRaftActorContext().setCommitIndex(4);
621                 followerActor.getRaftActorContext().setLastApplied(4);
622                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
623
624                 followerActor.waitForInitializeBehaviorComplete();
625
626
627                 Follower follower = new Follower(followerActor.getRaftActorContext());
628                 followerActor.setCurrentBehavior(follower);
629                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
630
631                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
632                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
633                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
634
635                 // log has indices 0-5
636                 assertEquals(6, followerActor.getReplicatedLog().size());
637
638                 //snapshot on 4
639                 followerActor.getRaftActorContext().getSnapshotManager().capture(
640                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
641                                 new MockRaftActorContext.MockPayload("D")), 4);
642
643                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
644
645                 assertEquals(6, followerActor.getReplicatedLog().size());
646
647                 //fake snapshot on index 6
648                 List<ReplicatedLogEntry> entries =
649                         Arrays.asList(
650                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
651                                         new MockRaftActorContext.MockPayload("foo-6"))
652                         );
653                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
654                 assertEquals(7, followerActor.getReplicatedLog().size());
655
656                 //fake snapshot on index 7
657                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
658
659                 entries =
660                         Arrays.asList(
661                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
662                                         new MockRaftActorContext.MockPayload("foo-7"))
663                         );
664                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
665                 assertEquals(8, followerActor.getReplicatedLog().size());
666
667                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
668
669
670                 ByteString snapshotBytes = fromObject(Arrays.asList(
671                         new MockRaftActorContext.MockPayload("foo-0"),
672                         new MockRaftActorContext.MockPayload("foo-1"),
673                         new MockRaftActorContext.MockPayload("foo-2"),
674                         new MockRaftActorContext.MockPayload("foo-3"),
675                         new MockRaftActorContext.MockPayload("foo-4")));
676                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
677                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
678
679                 // The commit is needed to complete the snapshot creation process
680                 followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
681
682                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
683                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
684                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
685
686                 entries =
687                         Arrays.asList(
688                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
689                                         new MockRaftActorContext.MockPayload("foo-7"))
690                         );
691                 // send an additional entry 8 with leaderCommit = 7
692                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
693
694                 // 7 and 8, as lastapplied is 7
695                 assertEquals(2, followerActor.getReplicatedLog().size());
696
697             }
698         };
699     }
700
701     @Test
702     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
703         new JavaTestKit(getSystem()) {
704             {
705                 String persistenceId = factory.generateActorId("leader-");
706                 String follower1Id = factory.generateActorId("follower-");
707                 String follower2Id = factory.generateActorId("follower-");
708
709                 ActorRef followerActor1 =
710                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
711                 ActorRef followerActor2 =
712                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
713
714                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
715                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
716                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
717
718                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
719
720                 Map<String, String> peerAddresses = new HashMap<>();
721                 peerAddresses.put(follower1Id, followerActor1.path().toString());
722                 peerAddresses.put(follower2Id, followerActor2.path().toString());
723
724                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
725                         MockRaftActor.props(persistenceId, peerAddresses,
726                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
727
728                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
729                 leaderActor.getRaftActorContext().setCommitIndex(9);
730                 leaderActor.getRaftActorContext().setLastApplied(9);
731                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
732
733                 leaderActor.waitForInitializeBehaviorComplete();
734
735                 Leader leader = new Leader(leaderActor.getRaftActorContext());
736                 leaderActor.setCurrentBehavior(leader);
737                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
738
739                 // create 5 entries in the log
740                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
741                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
742
743                 //set the snapshot index to 4 , 0 to 4 are snapshotted
744                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
745                 //setting replicatedToAllIndex = 9, for the log to clear
746                 leader.setReplicatedToAllIndex(9);
747                 assertEquals(5, leaderActor.getReplicatedLog().size());
748                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
749
750                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
751                 assertEquals(5, leaderActor.getReplicatedLog().size());
752                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
753
754                 // set the 2nd follower nextIndex to 1 which has been snapshotted
755                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
756                 assertEquals(5, leaderActor.getReplicatedLog().size());
757                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
758
759                 // simulate a real snapshot
760                 leaderActor.onReceiveCommand(new SendHeartBeat());
761                 assertEquals(5, leaderActor.getReplicatedLog().size());
762                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
763                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
764                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
765
766
767                 //reply from a slow follower does not initiate a fake snapshot
768                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
769                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
770
771                 ByteString snapshotBytes = fromObject(Arrays.asList(
772                         new MockRaftActorContext.MockPayload("foo-0"),
773                         new MockRaftActorContext.MockPayload("foo-1"),
774                         new MockRaftActorContext.MockPayload("foo-2"),
775                         new MockRaftActorContext.MockPayload("foo-3"),
776                         new MockRaftActorContext.MockPayload("foo-4")));
777                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
778                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
779
780                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
781
782                 //reply from a slow follower after should not raise errors
783                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
784                 assertEquals(0, leaderActor.getReplicatedLog().size());
785             }
786         };
787     }
788
789     @Test
790     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
791         new JavaTestKit(getSystem()) {{
792             String persistenceId = factory.generateActorId("leader-");
793             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
794             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
795             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
796             config.setSnapshotBatchCount(5);
797
798             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
799
800             Map<String, String> peerAddresses = new HashMap<>();
801
802             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
803                     MockRaftActor.props(persistenceId, peerAddresses,
804                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
805
806             MockRaftActor leaderActor = mockActorRef.underlyingActor();
807             leaderActor.getRaftActorContext().setCommitIndex(3);
808             leaderActor.getRaftActorContext().setLastApplied(3);
809             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
810
811             leaderActor.waitForInitializeBehaviorComplete();
812             for(int i=0;i< 4;i++) {
813                 leaderActor.getReplicatedLog()
814                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
815                                 new MockRaftActorContext.MockPayload("A")));
816             }
817
818             Leader leader = new Leader(leaderActor.getRaftActorContext());
819             leaderActor.setCurrentBehavior(leader);
820             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
821
822             // Persist another entry (this will cause a CaptureSnapshot to be triggered
823             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
824
825             // Now send a CaptureSnapshotReply
826             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
827
828             // Trimming log in this scenario is a no-op
829             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
830             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
831             assertEquals(-1, leader.getReplicatedToAllIndex());
832
833         }};
834     }
835
836     @Test
837     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
838         new JavaTestKit(getSystem()) {{
839             String persistenceId = factory.generateActorId("leader-");
840             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
841             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
842             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
843             config.setSnapshotBatchCount(5);
844
845             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
846
847             Map<String, String> peerAddresses = new HashMap<>();
848
849             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
850                     MockRaftActor.props(persistenceId, peerAddresses,
851                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
852
853             MockRaftActor leaderActor = mockActorRef.underlyingActor();
854             leaderActor.getRaftActorContext().setCommitIndex(3);
855             leaderActor.getRaftActorContext().setLastApplied(3);
856             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
857             leaderActor.getReplicatedLog().setSnapshotIndex(3);
858
859             leaderActor.waitForInitializeBehaviorComplete();
860             Leader leader = new Leader(leaderActor.getRaftActorContext());
861             leaderActor.setCurrentBehavior(leader);
862             leader.setReplicatedToAllIndex(3);
863             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
864
865             // Persist another entry (this will cause a CaptureSnapshot to be triggered
866             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
867
868             // Now send a CaptureSnapshotReply
869             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
870
871             // Trimming log in this scenario is a no-op
872             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
873             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
874             assertEquals(3, leader.getReplicatedToAllIndex());
875
876         }};
877     }
878
879     public static ByteString fromObject(Object snapshot) throws Exception {
880         ByteArrayOutputStream b = null;
881         ObjectOutputStream o = null;
882         try {
883             b = new ByteArrayOutputStream();
884             o = new ObjectOutputStream(b);
885             o.writeObject(snapshot);
886             byte[] snapshotBytes = b.toByteArray();
887             return ByteString.copyFrom(snapshotBytes);
888         } finally {
889             if (o != null) {
890                 o.flush();
891                 o.close();
892             }
893             if (b != null) {
894                 b.close();
895             }
896         }
897     }
898
899 }