Fixed missing uses of ${artifactId} in urn
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import java.io.ByteArrayInputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import org.junit.After;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.opendaylight.controller.cluster.DataPersistenceProvider;
41 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
42 import org.opendaylight.controller.cluster.notifications.RoleChanged;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
44 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
46 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
48 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
49 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
50 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
51 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
53 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
54 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
55 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
60 import static org.junit.Assert.assertEquals;
61 import static org.junit.Assert.assertNotEquals;
62 import static org.junit.Assert.assertNotNull;
63 import static org.junit.Assert.assertNull;
64 import static org.junit.Assert.assertTrue;
65 import static org.mockito.Matchers.any;
66 import static org.mockito.Matchers.anyObject;
67 import static org.mockito.Matchers.eq;
68 import static org.mockito.Mockito.doReturn;
69 import static org.mockito.Mockito.mock;
70 import static org.mockito.Mockito.times;
71 import static org.mockito.Mockito.verify;
72
73 public class RaftActorTest extends AbstractActorTest {
74
75
76     @After
77     public void tearDown() {
78         MockAkkaJournal.clearJournal();
79         MockSnapshotStore.setMockSnapshot(null);
80     }
81
82     public static class MockRaftActor extends RaftActor {
83
84         private final DataPersistenceProvider dataPersistenceProvider;
85         private final RaftActor delegate;
86         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
87         private final List<Object> state;
88         private ActorRef roleChangeNotifier;
89
90         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
91             private static final long serialVersionUID = 1L;
92             private final Map<String, String> peerAddresses;
93             private final String id;
94             private final Optional<ConfigParams> config;
95             private final DataPersistenceProvider dataPersistenceProvider;
96             private final ActorRef roleChangeNotifier;
97
98             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
99                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
100                 ActorRef roleChangeNotifier) {
101                 this.peerAddresses = peerAddresses;
102                 this.id = id;
103                 this.config = config;
104                 this.dataPersistenceProvider = dataPersistenceProvider;
105                 this.roleChangeNotifier = roleChangeNotifier;
106             }
107
108             @Override
109             public MockRaftActor create() throws Exception {
110                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
111                     dataPersistenceProvider);
112                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
113                 return mockRaftActor;
114             }
115         }
116
117         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
118             super(id, peerAddresses, config);
119             state = new ArrayList<>();
120             this.delegate = mock(RaftActor.class);
121             if(dataPersistenceProvider == null){
122                 this.dataPersistenceProvider = new PersistentDataProvider();
123             } else {
124                 this.dataPersistenceProvider = dataPersistenceProvider;
125             }
126         }
127
128         public void waitForRecoveryComplete() {
129             try {
130                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
131             } catch (InterruptedException e) {
132                 e.printStackTrace();
133             }
134         }
135
136         public List<Object> getState() {
137             return state;
138         }
139
140         public static Props props(final String id, final Map<String, String> peerAddresses,
141                 Optional<ConfigParams> config){
142             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
143         }
144
145         public static Props props(final String id, final Map<String, String> peerAddresses,
146                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
147             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
148         }
149
150         public static Props props(final String id, final Map<String, String> peerAddresses,
151             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
152             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
153         }
154
155         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
156             delegate.applyState(clientActor, identifier, data);
157             LOG.info("applyState called");
158         }
159
160         @Override
161         protected void startLogRecoveryBatch(int maxBatchSize) {
162         }
163
164         @Override
165         protected void appendRecoveredLogEntry(Payload data) {
166             state.add(data);
167         }
168
169         @Override
170         protected void applyCurrentLogRecoveryBatch() {
171         }
172
173         @Override
174         protected void onRecoveryComplete() {
175             delegate.onRecoveryComplete();
176             recoveryComplete.countDown();
177         }
178
179         @Override
180         protected void applyRecoverySnapshot(ByteString snapshot) {
181             delegate.applyRecoverySnapshot(snapshot);
182             try {
183                 Object data = toObject(snapshot);
184                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
185                 if (data instanceof List) {
186                     state.addAll((List<?>) data);
187                 }
188             } catch (Exception e) {
189                 e.printStackTrace();
190             }
191         }
192
193         @Override protected void createSnapshot() {
194             delegate.createSnapshot();
195         }
196
197         @Override protected void applySnapshot(ByteString snapshot) {
198             delegate.applySnapshot(snapshot);
199         }
200
201         @Override protected void onStateChanged() {
202             delegate.onStateChanged();
203         }
204
205         @Override
206         protected DataPersistenceProvider persistence() {
207             return this.dataPersistenceProvider;
208         }
209
210         @Override
211         protected Optional<ActorRef> getRoleChangeNotifier() {
212             return Optional.fromNullable(roleChangeNotifier);
213         }
214
215         @Override public String persistenceId() {
216             return this.getId();
217         }
218
219         private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
220             Object obj = null;
221             ByteArrayInputStream bis = null;
222             ObjectInputStream ois = null;
223             try {
224                 bis = new ByteArrayInputStream(bs.toByteArray());
225                 ois = new ObjectInputStream(bis);
226                 obj = ois.readObject();
227             } finally {
228                 if (bis != null) {
229                     bis.close();
230                 }
231                 if (ois != null) {
232                     ois.close();
233                 }
234             }
235             return obj;
236         }
237
238         public ReplicatedLog getReplicatedLog(){
239             return this.getRaftActorContext().getReplicatedLog();
240         }
241
242     }
243
244
245     private static class RaftActorTestKit extends JavaTestKit {
246         private final ActorRef raftActor;
247
248         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
249             super(actorSystem);
250
251             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
252                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
253
254         }
255
256
257         public ActorRef getRaftActor() {
258             return raftActor;
259         }
260
261         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
262             // Wait for a specific log message to show up
263             return
264                 new JavaTestKit.EventFilter<Boolean>(logEventClass
265                 ) {
266                     @Override
267                     protected Boolean run() {
268                         return true;
269                     }
270                 }.from(raftActor.path().toString())
271                     .message(message)
272                     .occurrences(1).exec();
273
274
275         }
276
277         protected void waitUntilLeader(){
278             waitUntilLeader(raftActor);
279         }
280
281         protected void waitUntilLeader(ActorRef actorRef) {
282             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
283             for(int i = 0; i < 20 * 5; i++) {
284                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
285                 try {
286                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
287                     if(resp.getLeaderActor() != null) {
288                         return;
289                     }
290                 } catch(TimeoutException e) {
291                 } catch(Exception e) {
292                     System.err.println("FindLeader threw ex");
293                     e.printStackTrace();
294                 }
295
296
297                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
298             }
299
300             Assert.fail("Leader not found for actorRef " + actorRef.path());
301         }
302
303     }
304
305
306     @Test
307     public void testConstruction() {
308         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
309     }
310
311     @Test
312     public void testFindLeaderWhenLeaderIsSelf(){
313         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
314         kit.waitUntilLeader();
315     }
316
317     @Test
318     public void testRaftActorRecovery() throws Exception {
319         new JavaTestKit(getSystem()) {{
320             String persistenceId = "follower10";
321
322             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
323             // Set the heartbeat interval high to essentially disable election otherwise the test
324             // may fail if the actor is switched to Leader and the commitIndex is set to the last
325             // log entry.
326             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
327
328             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
329                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
330
331             watch(followerActor);
332
333             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
334             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
335                     new MockRaftActorContext.MockPayload("E"));
336             snapshotUnappliedEntries.add(entry1);
337
338             int lastAppliedDuringSnapshotCapture = 3;
339             int lastIndexDuringSnapshotCapture = 4;
340
341                 // 4 messages as part of snapshot, which are applied to state
342             ByteString snapshotBytes  = fromObject(Arrays.asList(
343                         new MockRaftActorContext.MockPayload("A"),
344                         new MockRaftActorContext.MockPayload("B"),
345                         new MockRaftActorContext.MockPayload("C"),
346                         new MockRaftActorContext.MockPayload("D")));
347
348             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
349                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
350                     lastAppliedDuringSnapshotCapture, 1);
351             MockSnapshotStore.setMockSnapshot(snapshot);
352             MockSnapshotStore.setPersistenceId(persistenceId);
353
354             // add more entries after snapshot is taken
355             List<ReplicatedLogEntry> entries = new ArrayList<>();
356             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
357                     new MockRaftActorContext.MockPayload("F"));
358             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
359                     new MockRaftActorContext.MockPayload("G"));
360             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
361                     new MockRaftActorContext.MockPayload("H"));
362             entries.add(entry2);
363             entries.add(entry3);
364             entries.add(entry4);
365
366             int lastAppliedToState = 5;
367             int lastIndex = 7;
368
369             MockAkkaJournal.addToJournal(5, entry2);
370             // 2 entries are applied to state besides the 4 entries in snapshot
371             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
372             MockAkkaJournal.addToJournal(7, entry3);
373             MockAkkaJournal.addToJournal(8, entry4);
374
375             // kill the actor
376             followerActor.tell(PoisonPill.getInstance(), null);
377             expectMsgClass(duration("5 seconds"), Terminated.class);
378
379             unwatch(followerActor);
380
381             //reinstate the actor
382             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
383                     MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
384                             Optional.<ConfigParams>of(config)));
385
386             ref.underlyingActor().waitForRecoveryComplete();
387
388             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
389             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
390                     context.getReplicatedLog().size());
391             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
392             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
393             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
394             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
395         }};
396     }
397
398     /**
399      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
400      * process recovery messages
401      *
402      * @throws Exception
403      */
404
405     @Test
406     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
407         new JavaTestKit(getSystem()) {
408             {
409                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
410
411                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
412
413                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
414
415                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
416                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
417
418                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
419
420                 // Wait for akka's recovery to complete so it doesn't interfere.
421                 mockRaftActor.waitForRecoveryComplete();
422
423                 ByteString snapshotBytes  = fromObject(Arrays.asList(
424                         new MockRaftActorContext.MockPayload("A"),
425                         new MockRaftActorContext.MockPayload("B"),
426                         new MockRaftActorContext.MockPayload("C"),
427                         new MockRaftActorContext.MockPayload("D")));
428
429                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
430                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
431
432                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
433
434                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
435
436                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
437
438                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
439
440                 assertEquals("add replicated log entry", 1, replicatedLog.size());
441
442                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
443
444                 assertEquals("add replicated log entry", 2, replicatedLog.size());
445
446                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
447
448                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
449
450                 // The snapshot had 4 items + we added 2 more items during the test
451                 // We start removing from 5 and we should get 1 item in the replicated log
452                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
453
454                 assertEquals("remove log entries", 1, replicatedLog.size());
455
456                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
457
458                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
459                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
460
461                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
462
463                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
464
465             }};
466     }
467
468     /**
469      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
470      * not process recovery messages
471      *
472      * @throws Exception
473      */
474     @Test
475     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
476         new JavaTestKit(getSystem()) {
477             {
478                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
479
480                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
481
482                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
483
484                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
485                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
486
487                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
488
489                 // Wait for akka's recovery to complete so it doesn't interfere.
490                 mockRaftActor.waitForRecoveryComplete();
491
492                 ByteString snapshotBytes  = fromObject(Arrays.asList(
493                         new MockRaftActorContext.MockPayload("A"),
494                         new MockRaftActorContext.MockPayload("B"),
495                         new MockRaftActorContext.MockPayload("C"),
496                         new MockRaftActorContext.MockPayload("D")));
497
498                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
499                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
500
501                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
502
503                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
504
505                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
506
507                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
508
509                 assertEquals("add replicated log entry", 0, replicatedLog.size());
510
511                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
512
513                 assertEquals("add replicated log entry", 0, replicatedLog.size());
514
515                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
516
517                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
518
519                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
520
521                 assertEquals("remove log entries", 0, replicatedLog.size());
522
523                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
524
525                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
526                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
527
528                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
529
530                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
531             }};
532     }
533
534
535     @Test
536     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
537         new JavaTestKit(getSystem()) {
538             {
539                 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
540
541                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
542
543                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
544
545                 CountDownLatch persistLatch = new CountDownLatch(1);
546                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
547                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
548
549                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
550                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
551
552                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
553
554                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
555
556                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
557
558                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
559
560             }
561         };
562     }
563
564     @Test
565     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
566         new JavaTestKit(getSystem()) {
567             {
568                 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
569
570                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
571
572                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
573
574                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
575
576                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
577                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
578
579                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
580
581                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
582
583                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
584
585                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
586
587                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
588
589             }
590         };
591     }
592
593     @Test
594     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
595         new JavaTestKit(getSystem()) {
596             {
597                 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
598
599                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
600
601                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
602
603                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
604
605                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
606                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
607
608                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
609
610                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
611
612                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
613
614                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
615
616                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
617
618             }
619         };
620     }
621
622     @Test
623     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
624         new JavaTestKit(getSystem()) {
625             {
626                 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
627
628                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
629
630                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
631
632                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
633
634                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
635                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
636
637                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
638
639                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
640
641                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
642
643                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
644
645             }
646         };
647     }
648
649     @Test
650     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
651         new JavaTestKit(getSystem()) {
652             {
653                 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
654
655                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
656
657                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
658
659                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
660
661                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
662                     MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
663                         Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
664
665                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
666
667                 ByteString snapshotBytes  = fromObject(Arrays.asList(
668                         new MockRaftActorContext.MockPayload("A"),
669                         new MockRaftActorContext.MockPayload("B"),
670                         new MockRaftActorContext.MockPayload("C"),
671                         new MockRaftActorContext.MockPayload("D")));
672
673                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
674
675                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
676
677                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
678
679                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
680
681                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
682
683                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
684
685             }
686         };
687     }
688
689     @Test
690     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
691         new JavaTestKit(getSystem()) {
692             {
693                 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
694
695                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
696
697                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
698
699                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
700
701                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
702                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
703
704                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
705
706                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
707                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
708                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
709                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
710                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
711
712                 ByteString snapshotBytes = fromObject(Arrays.asList(
713                         new MockRaftActorContext.MockPayload("A"),
714                         new MockRaftActorContext.MockPayload("B"),
715                         new MockRaftActorContext.MockPayload("C"),
716                         new MockRaftActorContext.MockPayload("D")));
717
718                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
719                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
720
721                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
722
723                 verify(mockRaftActor.delegate).createSnapshot();
724
725                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
726
727                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
728
729                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
730
731                 verify(dataPersistenceProvider).deleteMessages(100);
732
733                 assertEquals(2, mockRaftActor.getReplicatedLog().size());
734
735                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
736                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
737
738                 // Index 2 will not be in the log because it was removed due to snapshotting
739                 assertNull(mockRaftActor.getReplicatedLog().get(2));
740
741                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
742
743             }
744         };
745     }
746
747     @Test
748     public void testApplyState() throws Exception {
749
750         new JavaTestKit(getSystem()) {
751             {
752                 String persistenceId = "testApplyState";
753
754                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
755
756                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
757
758                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
759
760                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
761                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
762
763                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
764
765                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
766                         new MockRaftActorContext.MockPayload("F"));
767
768                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
769
770                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
771
772                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
773
774             }
775         };
776     }
777
778     @Test
779     public void testApplySnapshot() throws Exception {
780         new JavaTestKit(getSystem()) {
781             {
782                 String persistenceId = "testApplySnapshot";
783
784                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
785
786                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787
788                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
789
790                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
791                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
792
793                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
794
795                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
796
797                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
798                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
799                 oldReplicatedLog.append(
800                     new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
801                         mock(Payload.class)));
802
803                 ByteString snapshotBytes = fromObject(Arrays.asList(
804                     new MockRaftActorContext.MockPayload("A"),
805                     new MockRaftActorContext.MockPayload("B"),
806                     new MockRaftActorContext.MockPayload("C"),
807                     new MockRaftActorContext.MockPayload("D")));
808
809                 Snapshot snapshot = mock(Snapshot.class);
810
811                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
812
813                 doReturn(3L).when(snapshot).getLastAppliedIndex();
814
815                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
816
817                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
818
819                 assertTrue("The replicatedLog should have changed",
820                     oldReplicatedLog != mockRaftActor.getReplicatedLog());
821
822                 assertEquals("lastApplied should be same as in the snapshot",
823                     (Long) 3L, mockRaftActor.getLastApplied());
824
825                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
826
827                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
828
829             }
830         };
831     }
832
833     @Test
834     public void testSaveSnapshotFailure() throws Exception {
835         new JavaTestKit(getSystem()) {
836             {
837                 String persistenceId = "testSaveSnapshotFailure";
838
839                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
840
841                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
842
843                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
844
845                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
846                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
847
848                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
849
850                 ByteString snapshotBytes  = fromObject(Arrays.asList(
851                         new MockRaftActorContext.MockPayload("A"),
852                         new MockRaftActorContext.MockPayload("B"),
853                         new MockRaftActorContext.MockPayload("C"),
854                         new MockRaftActorContext.MockPayload("D")));
855
856                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
857
858                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
859
860                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
861
862                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
863
864                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
865                         new Exception()));
866
867                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
868                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
869
870                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
871
872             }
873         };
874     }
875
876     @Test
877     public void testRaftRoleChangeNotifier() throws Exception {
878         new JavaTestKit(getSystem()) {{
879             ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
880             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
881             String id = "testRaftRoleChangeNotifier";
882
883             TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
884                 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
885
886             MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
887             mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext()));
888
889             // sleeping for a minimum of 2 seconds, if it spans more its fine.
890             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
891
892             List<Object> matches =  MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
893             assertNotNull(matches);
894             assertEquals(2, matches.size());
895
896             // check if the notifier got a role change from Follower to Candidate
897             RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
898             assertEquals(id, raftRoleChanged.getMemberId());
899             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
900             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
901
902             // check if the notifier got a role change from Candidate to Leader
903             raftRoleChanged = (RoleChanged) matches.get(1);
904             assertEquals(id, raftRoleChanged.getMemberId());
905             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
906             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
907         }};
908     }
909
910     private ByteString fromObject(Object snapshot) throws Exception {
911         ByteArrayOutputStream b = null;
912         ObjectOutputStream o = null;
913         try {
914             b = new ByteArrayOutputStream();
915             o = new ObjectOutputStream(b);
916             o.writeObject(snapshot);
917             byte[] snapshotBytes = b.toByteArray();
918             return ByteString.copyFrom(snapshotBytes);
919         } finally {
920             if (o != null) {
921                 o.flush();
922                 o.close();
923             }
924             if (b != null) {
925                 b.close();
926             }
927         }
928     }
929 }