1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import java.io.ByteArrayInputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.List;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import org.junit.After;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.opendaylight.controller.cluster.DataPersistenceProvider;
41 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
42 import org.opendaylight.controller.cluster.notifications.RoleChanged;
43 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
44 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
46 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
48 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
49 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
50 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
51 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
53 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
54 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
55 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
60 import static org.junit.Assert.assertEquals;
61 import static org.junit.Assert.assertNotEquals;
62 import static org.junit.Assert.assertNotNull;
63 import static org.junit.Assert.assertNull;
64 import static org.junit.Assert.assertTrue;
65 import static org.mockito.Matchers.any;
66 import static org.mockito.Matchers.anyObject;
67 import static org.mockito.Matchers.eq;
68 import static org.mockito.Mockito.doReturn;
69 import static org.mockito.Mockito.mock;
70 import static org.mockito.Mockito.times;
71 import static org.mockito.Mockito.verify;
73 public class RaftActorTest extends AbstractActorTest {
77 public void tearDown() {
78 MockAkkaJournal.clearJournal();
79 MockSnapshotStore.setMockSnapshot(null);
82 public static class MockRaftActor extends RaftActor {
84 private final DataPersistenceProvider dataPersistenceProvider;
85 private final RaftActor delegate;
86 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
87 private final List<Object> state;
88 private ActorRef roleChangeNotifier;
90 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
91 private static final long serialVersionUID = 1L;
92 private final Map<String, String> peerAddresses;
93 private final String id;
94 private final Optional<ConfigParams> config;
95 private final DataPersistenceProvider dataPersistenceProvider;
96 private final ActorRef roleChangeNotifier;
98 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
99 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
100 ActorRef roleChangeNotifier) {
101 this.peerAddresses = peerAddresses;
103 this.config = config;
104 this.dataPersistenceProvider = dataPersistenceProvider;
105 this.roleChangeNotifier = roleChangeNotifier;
109 public MockRaftActor create() throws Exception {
110 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
111 dataPersistenceProvider);
112 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
113 return mockRaftActor;
117 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
118 super(id, peerAddresses, config);
119 state = new ArrayList<>();
120 this.delegate = mock(RaftActor.class);
121 if(dataPersistenceProvider == null){
122 this.dataPersistenceProvider = new PersistentDataProvider();
124 this.dataPersistenceProvider = dataPersistenceProvider;
128 public void waitForRecoveryComplete() {
130 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
131 } catch (InterruptedException e) {
136 public List<Object> getState() {
140 public static Props props(final String id, final Map<String, String> peerAddresses,
141 Optional<ConfigParams> config){
142 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
145 public static Props props(final String id, final Map<String, String> peerAddresses,
146 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
147 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
150 public static Props props(final String id, final Map<String, String> peerAddresses,
151 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
152 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
155 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
156 delegate.applyState(clientActor, identifier, data);
157 LOG.info("applyState called");
161 protected void startLogRecoveryBatch(int maxBatchSize) {
165 protected void appendRecoveredLogEntry(Payload data) {
170 protected void applyCurrentLogRecoveryBatch() {
174 protected void onRecoveryComplete() {
175 delegate.onRecoveryComplete();
176 recoveryComplete.countDown();
180 protected void applyRecoverySnapshot(ByteString snapshot) {
181 delegate.applyRecoverySnapshot(snapshot);
183 Object data = toObject(snapshot);
184 System.out.println("!!!!!applyRecoverySnapshot: "+data);
185 if (data instanceof List) {
186 state.addAll((List<?>) data);
188 } catch (Exception e) {
193 @Override protected void createSnapshot() {
194 delegate.createSnapshot();
197 @Override protected void applySnapshot(ByteString snapshot) {
198 delegate.applySnapshot(snapshot);
201 @Override protected void onStateChanged() {
202 delegate.onStateChanged();
206 protected DataPersistenceProvider persistence() {
207 return this.dataPersistenceProvider;
211 protected Optional<ActorRef> getRoleChangeNotifier() {
212 return Optional.fromNullable(roleChangeNotifier);
215 @Override public String persistenceId() {
219 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
221 ByteArrayInputStream bis = null;
222 ObjectInputStream ois = null;
224 bis = new ByteArrayInputStream(bs.toByteArray());
225 ois = new ObjectInputStream(bis);
226 obj = ois.readObject();
238 public ReplicatedLog getReplicatedLog(){
239 return this.getRaftActorContext().getReplicatedLog();
245 private static class RaftActorTestKit extends JavaTestKit {
246 private final ActorRef raftActor;
248 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
251 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
252 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
257 public ActorRef getRaftActor() {
261 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
262 // Wait for a specific log message to show up
264 new JavaTestKit.EventFilter<Boolean>(logEventClass
267 protected Boolean run() {
270 }.from(raftActor.path().toString())
272 .occurrences(1).exec();
277 protected void waitUntilLeader(){
278 waitUntilLeader(raftActor);
281 protected void waitUntilLeader(ActorRef actorRef) {
282 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
283 for(int i = 0; i < 20 * 5; i++) {
284 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
286 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
287 if(resp.getLeaderActor() != null) {
290 } catch(TimeoutException e) {
291 } catch(Exception e) {
292 System.err.println("FindLeader threw ex");
297 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
300 Assert.fail("Leader not found for actorRef " + actorRef.path());
307 public void testConstruction() {
308 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
312 public void testFindLeaderWhenLeaderIsSelf(){
313 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
314 kit.waitUntilLeader();
318 public void testRaftActorRecovery() throws Exception {
319 new JavaTestKit(getSystem()) {{
320 String persistenceId = "follower10";
322 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
323 // Set the heartbeat interval high to essentially disable election otherwise the test
324 // may fail if the actor is switched to Leader and the commitIndex is set to the last
326 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
328 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
329 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
331 watch(followerActor);
333 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
334 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
335 new MockRaftActorContext.MockPayload("E"));
336 snapshotUnappliedEntries.add(entry1);
338 int lastAppliedDuringSnapshotCapture = 3;
339 int lastIndexDuringSnapshotCapture = 4;
341 // 4 messages as part of snapshot, which are applied to state
342 ByteString snapshotBytes = fromObject(Arrays.asList(
343 new MockRaftActorContext.MockPayload("A"),
344 new MockRaftActorContext.MockPayload("B"),
345 new MockRaftActorContext.MockPayload("C"),
346 new MockRaftActorContext.MockPayload("D")));
348 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
349 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
350 lastAppliedDuringSnapshotCapture, 1);
351 MockSnapshotStore.setMockSnapshot(snapshot);
352 MockSnapshotStore.setPersistenceId(persistenceId);
354 // add more entries after snapshot is taken
355 List<ReplicatedLogEntry> entries = new ArrayList<>();
356 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
357 new MockRaftActorContext.MockPayload("F"));
358 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
359 new MockRaftActorContext.MockPayload("G"));
360 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
361 new MockRaftActorContext.MockPayload("H"));
366 int lastAppliedToState = 5;
369 MockAkkaJournal.addToJournal(5, entry2);
370 // 2 entries are applied to state besides the 4 entries in snapshot
371 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
372 MockAkkaJournal.addToJournal(7, entry3);
373 MockAkkaJournal.addToJournal(8, entry4);
376 followerActor.tell(PoisonPill.getInstance(), null);
377 expectMsgClass(duration("5 seconds"), Terminated.class);
379 unwatch(followerActor);
381 //reinstate the actor
382 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
383 MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
384 Optional.<ConfigParams>of(config)));
386 ref.underlyingActor().waitForRecoveryComplete();
388 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
389 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
390 context.getReplicatedLog().size());
391 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
392 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
393 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
394 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
399 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
400 * process recovery messages
406 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
407 new JavaTestKit(getSystem()) {
409 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
411 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
413 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
415 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
416 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
418 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
420 // Wait for akka's recovery to complete so it doesn't interfere.
421 mockRaftActor.waitForRecoveryComplete();
423 ByteString snapshotBytes = fromObject(Arrays.asList(
424 new MockRaftActorContext.MockPayload("A"),
425 new MockRaftActorContext.MockPayload("B"),
426 new MockRaftActorContext.MockPayload("C"),
427 new MockRaftActorContext.MockPayload("D")));
429 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
430 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
432 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
434 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
436 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
438 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
440 assertEquals("add replicated log entry", 1, replicatedLog.size());
442 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
444 assertEquals("add replicated log entry", 2, replicatedLog.size());
446 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
448 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
450 // The snapshot had 4 items + we added 2 more items during the test
451 // We start removing from 5 and we should get 1 item in the replicated log
452 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
454 assertEquals("remove log entries", 1, replicatedLog.size());
456 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
458 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
459 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
461 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
463 mockActorRef.tell(PoisonPill.getInstance(), getRef());
469 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
470 * not process recovery messages
475 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
476 new JavaTestKit(getSystem()) {
478 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
480 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
482 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
484 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
485 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
487 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
489 // Wait for akka's recovery to complete so it doesn't interfere.
490 mockRaftActor.waitForRecoveryComplete();
492 ByteString snapshotBytes = fromObject(Arrays.asList(
493 new MockRaftActorContext.MockPayload("A"),
494 new MockRaftActorContext.MockPayload("B"),
495 new MockRaftActorContext.MockPayload("C"),
496 new MockRaftActorContext.MockPayload("D")));
498 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
499 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
501 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
503 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
505 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
507 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
509 assertEquals("add replicated log entry", 0, replicatedLog.size());
511 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
513 assertEquals("add replicated log entry", 0, replicatedLog.size());
515 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
517 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
519 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
521 assertEquals("remove log entries", 0, replicatedLog.size());
523 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
525 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
526 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
528 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
530 mockActorRef.tell(PoisonPill.getInstance(), getRef());
536 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
537 new JavaTestKit(getSystem()) {
539 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
541 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
543 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
545 CountDownLatch persistLatch = new CountDownLatch(1);
546 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
547 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
549 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
550 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
552 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
554 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
556 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
558 mockActorRef.tell(PoisonPill.getInstance(), getRef());
565 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
566 new JavaTestKit(getSystem()) {
568 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
570 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
572 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
574 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
576 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
577 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
579 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
581 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
583 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
585 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
587 mockActorRef.tell(PoisonPill.getInstance(), getRef());
594 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
595 new JavaTestKit(getSystem()) {
597 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
599 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
601 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
603 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
605 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
606 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
608 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
610 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
612 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
614 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
616 mockActorRef.tell(PoisonPill.getInstance(), getRef());
623 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
624 new JavaTestKit(getSystem()) {
626 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
628 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
630 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
632 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
634 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
635 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
637 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
639 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
641 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
643 mockActorRef.tell(PoisonPill.getInstance(), getRef());
650 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
651 new JavaTestKit(getSystem()) {
653 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
655 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
657 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
659 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
661 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
662 MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
663 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
665 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
667 ByteString snapshotBytes = fromObject(Arrays.asList(
668 new MockRaftActorContext.MockPayload("A"),
669 new MockRaftActorContext.MockPayload("B"),
670 new MockRaftActorContext.MockPayload("C"),
671 new MockRaftActorContext.MockPayload("D")));
673 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
675 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
677 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
679 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
681 verify(dataPersistenceProvider).saveSnapshot(anyObject());
683 mockActorRef.tell(PoisonPill.getInstance(), getRef());
690 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
691 new JavaTestKit(getSystem()) {
693 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
695 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
697 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
699 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
701 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
702 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
704 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
706 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
707 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
708 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
709 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
710 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
712 ByteString snapshotBytes = fromObject(Arrays.asList(
713 new MockRaftActorContext.MockPayload("A"),
714 new MockRaftActorContext.MockPayload("B"),
715 new MockRaftActorContext.MockPayload("C"),
716 new MockRaftActorContext.MockPayload("D")));
718 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
719 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
721 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
723 verify(mockRaftActor.delegate).createSnapshot();
725 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
727 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
729 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
731 verify(dataPersistenceProvider).deleteMessages(100);
733 assertEquals(2, mockRaftActor.getReplicatedLog().size());
735 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
736 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
738 // Index 2 will not be in the log because it was removed due to snapshotting
739 assertNull(mockRaftActor.getReplicatedLog().get(2));
741 mockActorRef.tell(PoisonPill.getInstance(), getRef());
748 public void testApplyState() throws Exception {
750 new JavaTestKit(getSystem()) {
752 String persistenceId = "testApplyState";
754 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
756 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
758 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
760 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
761 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
763 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
765 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
766 new MockRaftActorContext.MockPayload("F"));
768 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
770 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
772 mockActorRef.tell(PoisonPill.getInstance(), getRef());
779 public void testApplySnapshot() throws Exception {
780 new JavaTestKit(getSystem()) {
782 String persistenceId = "testApplySnapshot";
784 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
786 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
788 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
790 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
791 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
793 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
795 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
797 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
798 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
799 oldReplicatedLog.append(
800 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
801 mock(Payload.class)));
803 ByteString snapshotBytes = fromObject(Arrays.asList(
804 new MockRaftActorContext.MockPayload("A"),
805 new MockRaftActorContext.MockPayload("B"),
806 new MockRaftActorContext.MockPayload("C"),
807 new MockRaftActorContext.MockPayload("D")));
809 Snapshot snapshot = mock(Snapshot.class);
811 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
813 doReturn(3L).when(snapshot).getLastAppliedIndex();
815 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
817 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
819 assertTrue("The replicatedLog should have changed",
820 oldReplicatedLog != mockRaftActor.getReplicatedLog());
822 assertEquals("lastApplied should be same as in the snapshot",
823 (Long) 3L, mockRaftActor.getLastApplied());
825 assertEquals(0, mockRaftActor.getReplicatedLog().size());
827 mockActorRef.tell(PoisonPill.getInstance(), getRef());
834 public void testSaveSnapshotFailure() throws Exception {
835 new JavaTestKit(getSystem()) {
837 String persistenceId = "testSaveSnapshotFailure";
839 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
841 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
843 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
845 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
846 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
848 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
850 ByteString snapshotBytes = fromObject(Arrays.asList(
851 new MockRaftActorContext.MockPayload("A"),
852 new MockRaftActorContext.MockPayload("B"),
853 new MockRaftActorContext.MockPayload("C"),
854 new MockRaftActorContext.MockPayload("D")));
856 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
858 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
860 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
862 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
864 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
867 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
868 mockRaftActor.getReplicatedLog().getSnapshotIndex());
870 mockActorRef.tell(PoisonPill.getInstance(), getRef());
877 public void testRaftRoleChangeNotifier() throws Exception {
878 new JavaTestKit(getSystem()) {{
879 ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
880 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
881 String id = "testRaftRoleChangeNotifier";
883 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
884 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
886 // sleeping for a minimum of 2 seconds, if it spans more its fine.
887 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
889 List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
890 assertNotNull(matches);
891 assertEquals(3, matches.size());
893 // check if the notifier got a role change from null to Follower
894 RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
895 assertEquals(id, raftRoleChanged.getMemberId());
896 assertNull(raftRoleChanged.getOldRole());
897 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
899 // check if the notifier got a role change from Follower to Candidate
900 raftRoleChanged = (RoleChanged) matches.get(1);
901 assertEquals(id, raftRoleChanged.getMemberId());
902 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
903 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
905 // check if the notifier got a role change from Candidate to Leader
906 raftRoleChanged = (RoleChanged) matches.get(2);
907 assertEquals(id, raftRoleChanged.getMemberId());
908 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
909 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
913 private ByteString fromObject(Object snapshot) throws Exception {
914 ByteArrayOutputStream b = null;
915 ObjectOutputStream o = null;
917 b = new ByteArrayOutputStream();
918 o = new ObjectOutputStream(b);
919 o.writeObject(snapshot);
920 byte[] snapshotBytes = b.toByteArray();
921 return ByteString.copyFrom(snapshotBytes);