2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertArrayEquals;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNotSame;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.Matchers.any;
19 import static org.mockito.Matchers.anyObject;
20 import static org.mockito.Matchers.eq;
21 import static org.mockito.Matchers.same;
22 import static org.mockito.Mockito.doNothing;
23 import static org.mockito.Mockito.doReturn;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.reset;
27 import static org.mockito.Mockito.timeout;
28 import static org.mockito.Mockito.verify;
29 import akka.actor.ActorRef;
30 import akka.actor.PoisonPill;
31 import akka.actor.Props;
32 import akka.actor.Status.Failure;
33 import akka.actor.Terminated;
34 import akka.dispatch.Dispatchers;
35 import akka.japi.Procedure;
36 import akka.persistence.SaveSnapshotFailure;
37 import akka.persistence.SaveSnapshotSuccess;
38 import akka.persistence.SnapshotMetadata;
39 import akka.persistence.SnapshotOffer;
40 import akka.persistence.SnapshotSelectionCriteria;
41 import akka.testkit.JavaTestKit;
42 import akka.testkit.TestActorRef;
43 import com.google.common.base.Optional;
44 import com.google.common.collect.ImmutableMap;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.google.protobuf.ByteString;
47 import java.io.ByteArrayOutputStream;
48 import java.io.ObjectOutputStream;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.List;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.TimeoutException;
57 import org.apache.commons.lang3.SerializationUtils;
58 import org.junit.After;
59 import org.junit.Before;
60 import org.junit.Test;
61 import org.mockito.ArgumentCaptor;
62 import org.opendaylight.controller.cluster.DataPersistenceProvider;
63 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
64 import org.opendaylight.controller.cluster.PersistentDataProvider;
65 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
66 import org.opendaylight.controller.cluster.notifications.RoleChanged;
67 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
68 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
73 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
75 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
76 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
77 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
78 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
79 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
80 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
81 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
82 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
83 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
84 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
85 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
86 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
87 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
88 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
89 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
90 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93 import scala.concurrent.duration.Duration;
94 import scala.concurrent.duration.FiniteDuration;
96 public class RaftActorTest extends AbstractActorTest {
98 static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
100 private TestActorFactory factory;
104 factory = new TestActorFactory(getSystem());
107 @SuppressWarnings("unchecked")
108 private static DataPersistenceProvider mockPersistenceProvider() {
109 final DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
110 doReturn(false).when(dataPersistenceProvider).isRecoveryApplicable();
111 doReturn(0L).when(dataPersistenceProvider).getLastSequenceNumber();
112 doNothing().when(dataPersistenceProvider).saveSnapshot(any(Object.class));
113 doNothing().when(dataPersistenceProvider).persist(any(Object.class), any(Procedure.class));
114 doNothing().when(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
115 doNothing().when(dataPersistenceProvider).deleteMessages(0L);
117 return dataPersistenceProvider;
121 public void tearDown() throws Exception {
123 InMemoryJournal.clear();
124 InMemorySnapshotStore.clear();
128 public void testConstruction() {
129 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
133 public void testFindLeaderWhenLeaderIsSelf(){
134 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
135 kit.waitUntilLeader();
140 public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
141 TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
143 new JavaTestKit(getSystem()) {{
144 String persistenceId = factory.generateActorId("follower-");
146 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
148 // Set the heartbeat interval high to essentially disable election otherwise the test
149 // may fail if the actor is switched to Leader and the commitIndex is set to the last
151 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
153 ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
154 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
155 peerAddresses, config), persistenceId);
157 watch(followerActor);
159 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
160 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
161 new MockRaftActorContext.MockPayload("E"));
162 snapshotUnappliedEntries.add(entry1);
164 int lastAppliedDuringSnapshotCapture = 3;
165 int lastIndexDuringSnapshotCapture = 4;
167 // 4 messages as part of snapshot, which are applied to state
168 ByteString snapshotBytes = fromObject(Arrays.asList(
169 new MockRaftActorContext.MockPayload("A"),
170 new MockRaftActorContext.MockPayload("B"),
171 new MockRaftActorContext.MockPayload("C"),
172 new MockRaftActorContext.MockPayload("D")));
174 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
175 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
176 lastAppliedDuringSnapshotCapture, 1);
177 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
179 // add more entries after snapshot is taken
180 List<ReplicatedLogEntry> entries = new ArrayList<>();
181 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
182 new MockRaftActorContext.MockPayload("F", 2));
183 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
184 new MockRaftActorContext.MockPayload("G", 3));
185 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
186 new MockRaftActorContext.MockPayload("H", 4));
191 int lastAppliedToState = 5;
194 InMemoryJournal.addEntry(persistenceId, 5, entry2);
195 // 2 entries are applied to state besides the 4 entries in snapshot
196 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
197 InMemoryJournal.addEntry(persistenceId, 7, entry3);
198 InMemoryJournal.addEntry(persistenceId, 8, entry4);
201 followerActor.tell(PoisonPill.getInstance(), null);
202 expectMsgClass(duration("5 seconds"), Terminated.class);
204 unwatch(followerActor);
206 //reinstate the actor
207 TestActorRef<MockRaftActor> ref = factory.createTestActor(
208 MockRaftActor.props(persistenceId, peerAddresses, config));
210 MockRaftActor mockRaftActor = ref.underlyingActor();
212 mockRaftActor.waitForRecoveryComplete();
214 RaftActorContext context = mockRaftActor.getRaftActorContext();
215 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
216 context.getReplicatedLog().size());
217 assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
218 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
219 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
220 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
221 assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
223 mockRaftActor.waitForInitializeBehaviorComplete();
225 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
228 TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
232 public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
233 new JavaTestKit(getSystem()) {{
234 String persistenceId = factory.generateActorId("follower-");
236 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
238 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
240 TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
241 ImmutableMap.<String, String>builder().put("member1", "address").build(),
242 config, new NonPersistentDataProvider()), persistenceId);
244 MockRaftActor mockRaftActor = ref.underlyingActor();
246 mockRaftActor.waitForRecoveryComplete();
248 mockRaftActor.waitForInitializeBehaviorComplete();
250 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
255 public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
256 new JavaTestKit(getSystem()) {{
257 String persistenceId = factory.generateActorId("follower-");
258 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
259 config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
260 config.setElectionTimeoutFactor(1);
262 InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
264 TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
265 ImmutableMap.<String, String>builder().put("member1", "address").build(),
266 config, new NonPersistentDataProvider()).
267 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
269 InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
270 List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
271 assertEquals("UpdateElectionTerm entries", 1, entries.size());
272 UpdateElectionTerm updateEntry = entries.get(0);
274 factory.killActor(ref, this);
276 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
277 ref = factory.createTestActor(MockRaftActor.props(persistenceId,
278 ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
279 new NonPersistentDataProvider()).
280 withDispatcher(Dispatchers.DefaultDispatcherId()),
281 factory.generateActorId("follower-"));
283 MockRaftActor actor = ref.underlyingActor();
284 actor.waitForRecoveryComplete();
286 RaftActorContext newContext = actor.getRaftActorContext();
287 assertEquals("electionTerm", updateEntry.getCurrentTerm(),
288 newContext.getTermInformation().getCurrentTerm());
289 assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
291 entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
292 assertEquals("UpdateElectionTerm entries", 1, entries.size());
297 public void testRaftActorForwardsToRaftActorRecoverySupport() {
298 String persistenceId = factory.generateActorId("leader-");
300 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
302 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
304 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
305 Collections.<String, String>emptyMap(), config), persistenceId);
307 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
309 // Wait for akka's recovery to complete so it doesn't interfere.
310 mockRaftActor.waitForRecoveryComplete();
312 RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
313 doReturn(false).when(mockSupport).handleRecoveryMessage(any(Object.class), any(PersistentDataProvider.class));
314 mockRaftActor.setRaftActorRecoverySupport(mockSupport);
316 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
317 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
318 mockRaftActor.handleRecover(snapshotOffer);
320 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
321 1, new MockRaftActorContext.MockPayload("1", 5));
322 mockRaftActor.handleRecover(logEntry);
324 ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
325 mockRaftActor.handleRecover(applyJournalEntries);
327 ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
328 mockRaftActor.handleRecover(applyLogEntries);
330 DeleteEntries deleteEntries = new DeleteEntries(1);
331 mockRaftActor.handleRecover(deleteEntries);
333 org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
334 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
335 mockRaftActor.handleRecover(deprecatedDeleteEntries);
337 UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
338 mockRaftActor.handleRecover(updateElectionTerm);
340 org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
341 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
342 mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
344 verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
345 verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
346 verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
347 verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
348 verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
349 verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
350 verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
351 verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
355 public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
356 String persistenceId = factory.generateActorId("leader-");
358 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
360 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
362 RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
364 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
365 config(config).snapshotMessageSupport(mockSupport).props());
367 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
369 // Wait for akka's recovery to complete so it doesn't interfere.
370 mockRaftActor.waitForRecoveryComplete();
372 ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
373 doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
374 mockRaftActor.handleCommand(applySnapshot);
376 CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
377 doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
378 mockRaftActor.handleCommand(captureSnapshot);
380 CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
381 doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
382 mockRaftActor.handleCommand(captureSnapshotReply);
384 SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
385 doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
386 mockRaftActor.handleCommand(saveSnapshotSuccess);
388 SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
389 doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
390 mockRaftActor.handleCommand(saveSnapshotFailure);
392 doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
393 any(ActorRef.class));
394 mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
396 doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
397 mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
399 verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
400 verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
401 verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
402 verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
403 verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
404 verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
405 any(ActorRef.class));
406 verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
410 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
411 new JavaTestKit(getSystem()) {
413 String persistenceId = factory.generateActorId("leader-");
415 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
417 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
419 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
421 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
422 Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
424 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
426 mockRaftActor.waitForInitializeBehaviorComplete();
428 mockRaftActor.waitUntilLeader();
430 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
432 verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
440 public void testApplyState() throws Exception {
442 new JavaTestKit(getSystem()) {
444 String persistenceId = factory.generateActorId("leader-");
446 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
448 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
450 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
452 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
453 Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
455 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
457 mockRaftActor.waitForInitializeBehaviorComplete();
459 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
460 new MockRaftActorContext.MockPayload("F"));
462 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
464 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
471 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
472 new JavaTestKit(getSystem()) {{
473 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
474 Props.create(MessageCollectorActor.class));
475 MessageCollectorActor.waitUntilReady(notifierActor);
477 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
478 long heartBeatInterval = 100;
479 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
480 config.setElectionTimeoutFactor(20);
482 String persistenceId = factory.generateActorId("notifier-");
484 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
485 config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
486 new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
489 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
492 // check if the notifier got a role change from null to Follower
493 RoleChanged raftRoleChanged = matches.get(0);
494 assertEquals(persistenceId, raftRoleChanged.getMemberId());
495 assertNull(raftRoleChanged.getOldRole());
496 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
498 // check if the notifier got a role change from Follower to Candidate
499 raftRoleChanged = matches.get(1);
500 assertEquals(persistenceId, raftRoleChanged.getMemberId());
501 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
502 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
504 // check if the notifier got a role change from Candidate to Leader
505 raftRoleChanged = matches.get(2);
506 assertEquals(persistenceId, raftRoleChanged.getMemberId());
507 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
508 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
510 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
511 notifierActor, LeaderStateChanged.class);
513 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
514 assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
516 notifierActor.underlyingActor().clear();
518 MockRaftActor raftActor = raftActorRef.underlyingActor();
519 final String newLeaderId = "new-leader";
520 final short newLeaderVersion = 6;
521 Follower follower = new Follower(raftActor.getRaftActorContext()) {
523 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
524 leaderId = newLeaderId;
525 setLeaderPayloadVersion(newLeaderVersion);
530 raftActor.newBehavior(follower);
532 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
533 assertEquals(persistenceId, leaderStateChange.getMemberId());
534 assertEquals(null, leaderStateChange.getLeaderId());
536 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
537 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
538 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
540 notifierActor.underlyingActor().clear();
542 raftActor.handleCommand("any");
544 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
545 assertEquals(persistenceId, leaderStateChange.getMemberId());
546 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
547 assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
549 notifierActor.underlyingActor().clear();
551 raftActor.handleCommand("any");
553 Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
554 leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
555 assertNull(leaderStateChange);
560 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
561 new JavaTestKit(getSystem()) {{
562 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
563 MessageCollectorActor.waitUntilReady(notifierActor);
565 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
566 long heartBeatInterval = 100;
567 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
568 config.setElectionTimeoutFactor(1);
570 String persistenceId = factory.generateActorId("notifier-");
572 factory.createActor(MockRaftActor.builder().id(persistenceId).
573 peerAddresses(ImmutableMap.of("leader", "fake/path")).
574 config(config).roleChangeNotifier(notifierActor).props());
576 List<RoleChanged> matches = null;
577 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
578 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
579 assertNotNull(matches);
580 if(matches.size() == 3) {
583 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
586 assertNotNull(matches);
587 assertEquals(2, matches.size());
589 // check if the notifier got a role change from null to Follower
590 RoleChanged raftRoleChanged = matches.get(0);
591 assertEquals(persistenceId, raftRoleChanged.getMemberId());
592 assertNull(raftRoleChanged.getOldRole());
593 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
595 // check if the notifier got a role change from Follower to Candidate
596 raftRoleChanged = matches.get(1);
597 assertEquals(persistenceId, raftRoleChanged.getMemberId());
598 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
599 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
605 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
606 new JavaTestKit(getSystem()) {
608 String persistenceId = factory.generateActorId("leader-");
609 String follower1Id = factory.generateActorId("follower-");
611 ActorRef followerActor1 =
612 factory.createActor(Props.create(MessageCollectorActor.class));
614 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
615 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
616 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
618 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
620 Map<String, String> peerAddresses = new HashMap<>();
621 peerAddresses.put(follower1Id, followerActor1.path().toString());
623 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
624 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
626 MockRaftActor leaderActor = mockActorRef.underlyingActor();
628 leaderActor.getRaftActorContext().setCommitIndex(4);
629 leaderActor.getRaftActorContext().setLastApplied(4);
630 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
632 leaderActor.waitForInitializeBehaviorComplete();
634 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
636 Leader leader = new Leader(leaderActor.getRaftActorContext());
637 leaderActor.setCurrentBehavior(leader);
638 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
640 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
641 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
643 assertEquals(8, leaderActor.getReplicatedLog().size());
645 leaderActor.getRaftActorContext().getSnapshotManager()
646 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
647 new MockRaftActorContext.MockPayload("x")), 4);
649 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
651 assertEquals(8, leaderActor.getReplicatedLog().size());
653 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
654 //fake snapshot on index 5
655 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
657 assertEquals(8, leaderActor.getReplicatedLog().size());
659 //fake snapshot on index 6
660 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
661 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
662 assertEquals(8, leaderActor.getReplicatedLog().size());
664 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
666 assertEquals(8, leaderActor.getReplicatedLog().size());
668 ByteString snapshotBytes = fromObject(Arrays.asList(
669 new MockRaftActorContext.MockPayload("foo-0"),
670 new MockRaftActorContext.MockPayload("foo-1"),
671 new MockRaftActorContext.MockPayload("foo-2"),
672 new MockRaftActorContext.MockPayload("foo-3"),
673 new MockRaftActorContext.MockPayload("foo-4")));
675 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
676 leader, Runtime.getRuntime().totalMemory());
678 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
680 // The commit is needed to complete the snapshot creation process
681 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
683 // capture snapshot reply should remove the snapshotted entries only
684 assertEquals(3, leaderActor.getReplicatedLog().size());
685 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
687 // add another non-replicated entry
688 leaderActor.getReplicatedLog().append(
689 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
691 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
692 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
693 assertEquals(2, leaderActor.getReplicatedLog().size());
694 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
701 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
702 new JavaTestKit(getSystem()) {
704 String persistenceId = factory.generateActorId("follower-");
705 String leaderId = factory.generateActorId("leader-");
708 ActorRef leaderActor1 =
709 factory.createActor(Props.create(MessageCollectorActor.class));
711 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
712 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
713 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
715 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
717 Map<String, String> peerAddresses = new HashMap<>();
718 peerAddresses.put(leaderId, leaderActor1.path().toString());
720 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
721 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
723 MockRaftActor followerActor = mockActorRef.underlyingActor();
724 followerActor.getRaftActorContext().setCommitIndex(4);
725 followerActor.getRaftActorContext().setLastApplied(4);
726 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
728 followerActor.waitForInitializeBehaviorComplete();
731 Follower follower = new Follower(followerActor.getRaftActorContext());
732 followerActor.setCurrentBehavior(follower);
733 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
735 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
736 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
737 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
739 // log has indices 0-5
740 assertEquals(6, followerActor.getReplicatedLog().size());
743 followerActor.getRaftActorContext().getSnapshotManager().capture(
744 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
745 new MockRaftActorContext.MockPayload("D")), 4);
747 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
749 assertEquals(6, followerActor.getReplicatedLog().size());
751 //fake snapshot on index 6
752 List<ReplicatedLogEntry> entries =
754 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
755 new MockRaftActorContext.MockPayload("foo-6"))
757 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
758 assertEquals(7, followerActor.getReplicatedLog().size());
760 //fake snapshot on index 7
761 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
765 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
766 new MockRaftActorContext.MockPayload("foo-7"))
768 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
769 assertEquals(8, followerActor.getReplicatedLog().size());
771 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
774 ByteString snapshotBytes = fromObject(Arrays.asList(
775 new MockRaftActorContext.MockPayload("foo-0"),
776 new MockRaftActorContext.MockPayload("foo-1"),
777 new MockRaftActorContext.MockPayload("foo-2"),
778 new MockRaftActorContext.MockPayload("foo-3"),
779 new MockRaftActorContext.MockPayload("foo-4")));
780 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
781 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
783 // The commit is needed to complete the snapshot creation process
784 followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
786 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
787 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
788 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
792 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
793 new MockRaftActorContext.MockPayload("foo-7"))
795 // send an additional entry 8 with leaderCommit = 7
796 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
798 // 7 and 8, as lastapplied is 7
799 assertEquals(2, followerActor.getReplicatedLog().size());
806 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
807 new JavaTestKit(getSystem()) {
809 String persistenceId = factory.generateActorId("leader-");
810 String follower1Id = factory.generateActorId("follower-");
811 String follower2Id = factory.generateActorId("follower-");
813 ActorRef followerActor1 =
814 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
815 ActorRef followerActor2 =
816 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
818 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
819 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
820 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
822 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
824 Map<String, String> peerAddresses = new HashMap<>();
825 peerAddresses.put(follower1Id, followerActor1.path().toString());
826 peerAddresses.put(follower2Id, followerActor2.path().toString());
828 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
829 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
831 MockRaftActor leaderActor = mockActorRef.underlyingActor();
832 leaderActor.getRaftActorContext().setCommitIndex(9);
833 leaderActor.getRaftActorContext().setLastApplied(9);
834 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
836 leaderActor.waitForInitializeBehaviorComplete();
838 Leader leader = new Leader(leaderActor.getRaftActorContext());
839 leaderActor.setCurrentBehavior(leader);
840 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
842 // create 5 entries in the log
843 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
844 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
846 //set the snapshot index to 4 , 0 to 4 are snapshotted
847 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
848 //setting replicatedToAllIndex = 9, for the log to clear
849 leader.setReplicatedToAllIndex(9);
850 assertEquals(5, leaderActor.getReplicatedLog().size());
851 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
853 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
854 assertEquals(5, leaderActor.getReplicatedLog().size());
855 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
857 // set the 2nd follower nextIndex to 1 which has been snapshotted
858 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
859 assertEquals(5, leaderActor.getReplicatedLog().size());
860 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
862 // simulate a real snapshot
863 leaderActor.onReceiveCommand(new SendHeartBeat());
864 assertEquals(5, leaderActor.getReplicatedLog().size());
865 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
866 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
867 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
870 //reply from a slow follower does not initiate a fake snapshot
871 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
872 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
874 ByteString snapshotBytes = fromObject(Arrays.asList(
875 new MockRaftActorContext.MockPayload("foo-0"),
876 new MockRaftActorContext.MockPayload("foo-1"),
877 new MockRaftActorContext.MockPayload("foo-2"),
878 new MockRaftActorContext.MockPayload("foo-3"),
879 new MockRaftActorContext.MockPayload("foo-4")));
880 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
881 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
883 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
885 //reply from a slow follower after should not raise errors
886 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
887 assertEquals(0, leaderActor.getReplicatedLog().size());
893 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
894 new JavaTestKit(getSystem()) {{
895 String persistenceId = factory.generateActorId("leader-");
896 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
897 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
898 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
899 config.setSnapshotBatchCount(5);
901 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
903 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
905 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
906 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
908 MockRaftActor leaderActor = mockActorRef.underlyingActor();
909 leaderActor.getRaftActorContext().setCommitIndex(3);
910 leaderActor.getRaftActorContext().setLastApplied(3);
911 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
913 leaderActor.waitForInitializeBehaviorComplete();
914 for(int i=0;i< 4;i++) {
915 leaderActor.getReplicatedLog()
916 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
917 new MockRaftActorContext.MockPayload("A")));
920 Leader leader = new Leader(leaderActor.getRaftActorContext());
921 leaderActor.setCurrentBehavior(leader);
922 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
924 // Simulate an install snaphost to a follower.
925 leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
926 leaderActor.getReplicatedLog().last(), -1, "member1");
928 // Now send a CaptureSnapshotReply
929 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
931 // Trimming log in this scenario is a no-op
932 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
933 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
934 assertEquals(-1, leader.getReplicatedToAllIndex());
940 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
941 new JavaTestKit(getSystem()) {{
942 String persistenceId = factory.generateActorId("leader-");
943 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
944 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
945 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
946 config.setSnapshotBatchCount(5);
948 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
950 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
952 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
953 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
955 MockRaftActor leaderActor = mockActorRef.underlyingActor();
956 leaderActor.getRaftActorContext().setCommitIndex(3);
957 leaderActor.getRaftActorContext().setLastApplied(3);
958 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
959 leaderActor.getReplicatedLog().setSnapshotIndex(3);
961 leaderActor.waitForInitializeBehaviorComplete();
962 Leader leader = new Leader(leaderActor.getRaftActorContext());
963 leaderActor.setCurrentBehavior(leader);
964 leader.setReplicatedToAllIndex(3);
965 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
967 // Persist another entry (this will cause a CaptureSnapshot to be triggered
968 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
970 // Now send a CaptureSnapshotReply
971 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
973 // Trimming log in this scenario is a no-op
974 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
975 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
976 assertEquals(3, leader.getReplicatedToAllIndex());
982 public void testRaftActorOnRecoverySnapshot() throws Exception {
983 TEST_LOG.info("testRaftActorOnRecoverySnapshot");
985 new JavaTestKit(getSystem()) {{
986 String persistenceId = factory.generateActorId("follower-");
988 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
990 // Set the heartbeat interval high to essentially disable election otherwise the test
991 // may fail if the actor is switched to Leader
992 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
994 ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
996 // Create mock ReplicatedLogEntry
997 ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
998 new MockRaftActorContext.MockPayload("F", 1));
1000 InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
1002 TestActorRef<MockRaftActor> ref = factory.createTestActor(
1003 MockRaftActor.props(persistenceId, peerAddresses, config));
1005 MockRaftActor mockRaftActor = ref.underlyingActor();
1007 mockRaftActor.waitForRecoveryComplete();
1009 mockRaftActor.waitForInitializeBehaviorComplete();
1011 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1016 public void testSwitchBehavior(){
1017 String persistenceId = factory.generateActorId("leader-");
1018 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1019 config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1020 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1021 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1022 config.setSnapshotBatchCount(5);
1024 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1026 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
1028 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1029 MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
1031 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1033 leaderActor.waitForRecoveryComplete();
1035 leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
1037 assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1038 assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
1040 leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
1042 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1043 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1045 leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
1047 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1048 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1050 leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
1052 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1053 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1056 public static ByteString fromObject(Object snapshot) throws Exception {
1057 ByteArrayOutputStream b = null;
1058 ObjectOutputStream o = null;
1060 b = new ByteArrayOutputStream();
1061 o = new ObjectOutputStream(b);
1062 o.writeObject(snapshot);
1063 byte[] snapshotBytes = b.toByteArray();
1064 return ByteString.copyFrom(snapshotBytes);
1077 public void testUpdateConfigParam() throws Exception {
1078 DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
1079 String persistenceId = factory.generateActorId("follower-");
1080 ImmutableMap<String, String> peerAddresses =
1081 ImmutableMap.<String, String>builder().put("member1", "address").build();
1082 DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
1084 TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
1085 MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
1086 MockRaftActor mockRaftActor = actorRef.underlyingActor();
1087 mockRaftActor.waitForInitializeBehaviorComplete();
1089 RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
1090 mockRaftActor.updateConfigParams(emptyConfig);
1091 assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1092 assertEquals("Behavior State", RaftState.Follower,
1093 mockRaftActor.getCurrentBehavior().state());
1095 DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
1096 disableConfig.setCustomRaftPolicyImplementationClass(
1097 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1098 mockRaftActor.updateConfigParams(disableConfig);
1099 assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1100 assertEquals("Behavior State", RaftState.Follower,
1101 mockRaftActor.getCurrentBehavior().state());
1103 behavior = mockRaftActor.getCurrentBehavior();
1104 mockRaftActor.updateConfigParams(disableConfig);
1105 assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1106 assertEquals("Behavior State", RaftState.Follower,
1107 mockRaftActor.getCurrentBehavior().state());
1109 DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
1110 defaultConfig.setCustomRaftPolicyImplementationClass(
1111 "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
1112 mockRaftActor.updateConfigParams(defaultConfig);
1113 assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1114 assertEquals("Behavior State", RaftState.Follower,
1115 mockRaftActor.getCurrentBehavior().state());
1117 behavior = mockRaftActor.getCurrentBehavior();
1118 mockRaftActor.updateConfigParams(defaultConfig);
1119 assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1120 assertEquals("Behavior State", RaftState.Follower,
1121 mockRaftActor.getCurrentBehavior().state());
1125 public void testGetSnapshot() throws Exception {
1126 TEST_LOG.info("testGetSnapshot starting");
1128 JavaTestKit kit = new JavaTestKit(getSystem());
1130 String persistenceId = factory.generateActorId("test-actor-");
1131 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1132 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1136 InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1137 InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
1138 new MockRaftActorContext.MockPayload("A")));
1139 InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
1140 new MockRaftActorContext.MockPayload("B")));
1141 InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1142 InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
1143 new MockRaftActorContext.MockPayload("C")));
1145 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1146 ImmutableMap.<String, String>builder().put("member1", "address").build(), config).
1147 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1148 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1150 mockRaftActor.waitForRecoveryComplete();
1152 // Wait for snapshot after recovery
1153 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1155 mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1156 doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1158 raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1160 ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1161 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
1163 byte[] stateSnapshot = new byte[]{1,2,3};
1164 replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
1166 GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1168 assertEquals("getId", persistenceId, reply.getId());
1169 Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1170 assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1171 assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1172 assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1173 assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1174 assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1175 assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1176 assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
1177 assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1178 assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1180 // Test with timeout
1182 mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
1183 reset(mockRaftActor.snapshotCohortDelegate);
1184 doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1186 raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1187 Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1188 assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1190 mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
1192 // Test with persistence disabled.
1194 mockRaftActor.setPersistence(false);
1195 reset(mockRaftActor.snapshotCohortDelegate);
1196 doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1198 raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1199 reply = kit.expectMsgClass(GetSnapshotReply.class);
1200 verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
1202 assertEquals("getId", persistenceId, reply.getId());
1203 replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1204 assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1205 assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1206 assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1207 assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1208 assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1209 assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1210 assertEquals("getState length", 0, replySnapshot.getState().length);
1211 assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1213 TEST_LOG.info("testGetSnapshot ending");
1217 public void testRestoreFromSnapshot() throws Exception {
1218 TEST_LOG.info("testRestoreFromSnapshot starting");
1220 String persistenceId = factory.generateActorId("test-actor-");
1221 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1222 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1224 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
1225 snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
1226 new MockRaftActorContext.MockPayload("E")));
1228 int snapshotLastApplied = 3;
1229 int snapshotLastIndex = 4;
1231 List<MockPayload> state = Arrays.asList(
1232 new MockRaftActorContext.MockPayload("A"),
1233 new MockRaftActorContext.MockPayload("B"),
1234 new MockRaftActorContext.MockPayload("C"),
1235 new MockRaftActorContext.MockPayload("D"));
1236 ByteString stateBytes = fromObject(state);
1238 Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
1239 snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
1241 InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
1243 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1244 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
1245 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1246 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1248 mockRaftActor.waitForRecoveryComplete();
1250 Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
1251 assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
1252 assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
1253 assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
1254 assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
1255 assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
1256 assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
1257 assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
1258 assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
1260 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
1262 RaftActorContext context = mockRaftActor.getRaftActorContext();
1263 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1264 assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
1265 assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
1266 assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
1267 assertEquals("Recovered state", state, mockRaftActor.getState());
1268 assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
1269 assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1271 // Test with data persistence disabled
1273 snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
1274 -1, -1, -1, -1, 5, "member-1");
1276 persistenceId = factory.generateActorId("test-actor-");
1278 raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1279 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).
1280 persistent(Optional.of(Boolean.FALSE)).props().
1281 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1282 mockRaftActor = raftActorRef.underlyingActor();
1284 mockRaftActor.waitForRecoveryComplete();
1285 assertEquals("snapshot committed", true,
1286 Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
1288 context = mockRaftActor.getRaftActorContext();
1289 assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
1290 assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
1292 TEST_LOG.info("testRestoreFromSnapshot ending");
1296 public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
1297 TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
1299 String persistenceId = factory.generateActorId("test-actor-");
1300 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1301 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1303 List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
1304 Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
1305 5, 2, 5, 2, 2, "member-1");
1307 InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1308 new MockRaftActorContext.MockPayload("B")));
1310 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1311 config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
1312 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1313 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1315 mockRaftActor.waitForRecoveryComplete();
1317 verify(mockRaftActor.snapshotCohortDelegate, timeout(500).never()).applySnapshot(any(byte[].class));
1319 RaftActorContext context = mockRaftActor.getRaftActorContext();
1320 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
1321 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
1322 assertEquals("Last applied", -1, context.getLastApplied());
1323 assertEquals("Commit index", -1, context.getCommitIndex());
1324 assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
1325 assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
1327 TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
1331 public void testNonVotingOnRecovery() throws Exception {
1332 TEST_LOG.info("testNonVotingOnRecovery starting");
1334 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1335 config.setElectionTimeoutFactor(1);
1336 config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
1338 String persistenceId = factory.generateActorId("test-actor-");
1339 InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
1340 new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
1342 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1343 config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1344 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1346 mockRaftActor.waitForInitializeBehaviorComplete();
1348 // Sleep a bit and verify it didn't get an election timeout and schedule an election.
1350 Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
1351 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
1353 TEST_LOG.info("testNonVotingOnRecovery ending");
1357 public void testLeaderTransitioning() throws Exception {
1358 TEST_LOG.info("testLeaderTransitioning starting");
1360 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
1361 Props.create(MessageCollectorActor.class));
1363 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1364 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1366 String persistenceId = factory.generateActorId("test-actor-");
1368 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
1369 config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1370 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1372 mockRaftActor.waitForInitializeBehaviorComplete();
1374 raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
1375 0L, -1L, (short)1), ActorRef.noSender());
1376 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1377 notifierActor, LeaderStateChanged.class);
1378 assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
1380 MessageCollectorActor.clearMessages(notifierActor);
1382 raftActorRef.tell(new LeaderTransitioning(), ActorRef.noSender());
1384 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1385 assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
1386 assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
1388 TEST_LOG.info("testLeaderTransitioning ending");