2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertArrayEquals;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNotSame;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.Matchers.any;
19 import static org.mockito.Matchers.anyObject;
20 import static org.mockito.Matchers.eq;
21 import static org.mockito.Matchers.same;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.never;
25 import static org.mockito.Mockito.reset;
26 import static org.mockito.Mockito.timeout;
27 import static org.mockito.Mockito.verify;
28 import akka.actor.ActorRef;
29 import akka.actor.PoisonPill;
30 import akka.actor.Props;
31 import akka.actor.Status.Failure;
32 import akka.actor.Terminated;
33 import akka.dispatch.Dispatchers;
34 import akka.japi.Procedure;
35 import akka.persistence.SaveSnapshotFailure;
36 import akka.persistence.SaveSnapshotSuccess;
37 import akka.persistence.SnapshotMetadata;
38 import akka.persistence.SnapshotOffer;
39 import akka.testkit.JavaTestKit;
40 import akka.testkit.TestActorRef;
41 import com.google.common.base.Optional;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.util.concurrent.Uninterruptibles;
44 import com.google.protobuf.ByteString;
45 import java.io.ByteArrayOutputStream;
46 import java.io.ObjectOutputStream;
47 import java.util.ArrayList;
48 import java.util.Arrays;
49 import java.util.Collections;
50 import java.util.HashMap;
51 import java.util.List;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.TimeoutException;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.junit.After;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.mockito.ArgumentCaptor;
60 import org.opendaylight.controller.cluster.DataPersistenceProvider;
61 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
62 import org.opendaylight.controller.cluster.PersistentDataProvider;
63 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
64 import org.opendaylight.controller.cluster.notifications.RoleChanged;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
66 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
67 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
68 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
69 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
70 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
71 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
72 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
73 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
74 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
75 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
76 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
77 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
78 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
79 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
80 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
81 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
82 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
83 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
84 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
85 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88 import scala.concurrent.duration.Duration;
89 import scala.concurrent.duration.FiniteDuration;
91 public class RaftActorTest extends AbstractActorTest {
93 static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
95 private TestActorFactory factory;
99 factory = new TestActorFactory(getSystem());
103 public void tearDown() throws Exception {
105 InMemoryJournal.clear();
106 InMemorySnapshotStore.clear();
110 public void testConstruction() {
111 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
115 public void testFindLeaderWhenLeaderIsSelf(){
116 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
117 kit.waitUntilLeader();
122 public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
123 TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
125 new JavaTestKit(getSystem()) {{
126 String persistenceId = factory.generateActorId("follower-");
128 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
130 // Set the heartbeat interval high to essentially disable election otherwise the test
131 // may fail if the actor is switched to Leader and the commitIndex is set to the last
133 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
135 ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
136 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
137 peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
139 watch(followerActor);
141 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
142 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
143 new MockRaftActorContext.MockPayload("E"));
144 snapshotUnappliedEntries.add(entry1);
146 int lastAppliedDuringSnapshotCapture = 3;
147 int lastIndexDuringSnapshotCapture = 4;
149 // 4 messages as part of snapshot, which are applied to state
150 ByteString snapshotBytes = fromObject(Arrays.asList(
151 new MockRaftActorContext.MockPayload("A"),
152 new MockRaftActorContext.MockPayload("B"),
153 new MockRaftActorContext.MockPayload("C"),
154 new MockRaftActorContext.MockPayload("D")));
156 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
157 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
158 lastAppliedDuringSnapshotCapture, 1);
159 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
161 // add more entries after snapshot is taken
162 List<ReplicatedLogEntry> entries = new ArrayList<>();
163 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
164 new MockRaftActorContext.MockPayload("F", 2));
165 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
166 new MockRaftActorContext.MockPayload("G", 3));
167 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
168 new MockRaftActorContext.MockPayload("H", 4));
173 int lastAppliedToState = 5;
176 InMemoryJournal.addEntry(persistenceId, 5, entry2);
177 // 2 entries are applied to state besides the 4 entries in snapshot
178 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
179 InMemoryJournal.addEntry(persistenceId, 7, entry3);
180 InMemoryJournal.addEntry(persistenceId, 8, entry4);
183 followerActor.tell(PoisonPill.getInstance(), null);
184 expectMsgClass(duration("5 seconds"), Terminated.class);
186 unwatch(followerActor);
188 //reinstate the actor
189 TestActorRef<MockRaftActor> ref = factory.createTestActor(
190 MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
192 MockRaftActor mockRaftActor = ref.underlyingActor();
194 mockRaftActor.waitForRecoveryComplete();
196 RaftActorContext context = mockRaftActor.getRaftActorContext();
197 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
198 context.getReplicatedLog().size());
199 assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
200 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
201 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
202 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
203 assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
205 mockRaftActor.waitForInitializeBehaviorComplete();
207 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
210 TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
214 public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
215 new JavaTestKit(getSystem()) {{
216 String persistenceId = factory.generateActorId("follower-");
218 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
220 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
222 TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
223 ImmutableMap.<String, String>builder().put("member1", "address").build(),
224 Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
226 MockRaftActor mockRaftActor = ref.underlyingActor();
228 mockRaftActor.waitForRecoveryComplete();
230 mockRaftActor.waitForInitializeBehaviorComplete();
232 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
237 public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
238 new JavaTestKit(getSystem()) {{
239 String persistenceId = factory.generateActorId("follower-");
240 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
241 config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
242 config.setElectionTimeoutFactor(1);
244 InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
246 TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
247 ImmutableMap.<String, String>builder().put("member1", "address").build(),
248 Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
249 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
251 InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
252 List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
253 assertEquals("UpdateElectionTerm entries", 1, entries.size());
254 UpdateElectionTerm updateEntry = entries.get(0);
256 factory.killActor(ref, this);
258 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
259 ref = factory.createTestActor(MockRaftActor.props(persistenceId,
260 ImmutableMap.<String, String>builder().put("member1", "address").build(),
261 Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
262 withDispatcher(Dispatchers.DefaultDispatcherId()),
263 factory.generateActorId("follower-"));
265 MockRaftActor actor = ref.underlyingActor();
266 actor.waitForRecoveryComplete();
268 RaftActorContext newContext = actor.getRaftActorContext();
269 assertEquals("electionTerm", updateEntry.getCurrentTerm(),
270 newContext.getTermInformation().getCurrentTerm());
271 assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
273 entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
274 assertEquals("UpdateElectionTerm entries", 1, entries.size());
279 public void testRaftActorForwardsToRaftActorRecoverySupport() {
280 String persistenceId = factory.generateActorId("leader-");
282 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
284 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
286 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
287 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
289 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
291 // Wait for akka's recovery to complete so it doesn't interfere.
292 mockRaftActor.waitForRecoveryComplete();
294 RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
295 mockRaftActor.setRaftActorRecoverySupport(mockSupport );
297 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
298 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
299 mockRaftActor.handleRecover(snapshotOffer);
301 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
302 1, new MockRaftActorContext.MockPayload("1", 5));
303 mockRaftActor.handleRecover(logEntry);
305 ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
306 mockRaftActor.handleRecover(applyJournalEntries);
308 ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
309 mockRaftActor.handleRecover(applyLogEntries);
311 DeleteEntries deleteEntries = new DeleteEntries(1);
312 mockRaftActor.handleRecover(deleteEntries);
314 org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
315 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
316 mockRaftActor.handleRecover(deprecatedDeleteEntries);
318 UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
319 mockRaftActor.handleRecover(updateElectionTerm);
321 org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
322 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
323 mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
325 verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
326 verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
327 verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
328 verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
329 verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
330 verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
331 verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
332 verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
336 public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
337 String persistenceId = factory.generateActorId("leader-");
339 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
341 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
343 RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
345 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
346 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
348 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
350 // Wait for akka's recovery to complete so it doesn't interfere.
351 mockRaftActor.waitForRecoveryComplete();
353 ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
354 doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
355 mockRaftActor.handleCommand(applySnapshot);
357 CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
358 doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
359 mockRaftActor.handleCommand(captureSnapshot);
361 CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
362 doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
363 mockRaftActor.handleCommand(captureSnapshotReply);
365 SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
366 doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
367 mockRaftActor.handleCommand(saveSnapshotSuccess);
369 SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
370 doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
371 mockRaftActor.handleCommand(saveSnapshotFailure);
373 doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
374 any(ActorRef.class));
375 mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
377 doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
378 mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
380 verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
381 verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
382 verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
383 verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
384 verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
385 verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
386 any(ActorRef.class));
387 verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
391 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
392 new JavaTestKit(getSystem()) {
394 String persistenceId = factory.generateActorId("leader-");
396 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
398 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
400 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
402 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
403 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
405 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
407 mockRaftActor.waitForInitializeBehaviorComplete();
409 mockRaftActor.waitUntilLeader();
411 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
413 verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
421 public void testApplyState() throws Exception {
423 new JavaTestKit(getSystem()) {
425 String persistenceId = factory.generateActorId("leader-");
427 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
429 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
431 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
433 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
434 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
436 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
438 mockRaftActor.waitForInitializeBehaviorComplete();
440 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
441 new MockRaftActorContext.MockPayload("F"));
443 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
445 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
452 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
453 new JavaTestKit(getSystem()) {{
454 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
455 Props.create(MessageCollectorActor.class));
456 MessageCollectorActor.waitUntilReady(notifierActor);
458 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
459 long heartBeatInterval = 100;
460 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
461 config.setElectionTimeoutFactor(20);
463 String persistenceId = factory.generateActorId("notifier-");
465 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
466 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
467 new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
469 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
472 // check if the notifier got a role change from null to Follower
473 RoleChanged raftRoleChanged = matches.get(0);
474 assertEquals(persistenceId, raftRoleChanged.getMemberId());
475 assertNull(raftRoleChanged.getOldRole());
476 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
478 // check if the notifier got a role change from Follower to Candidate
479 raftRoleChanged = matches.get(1);
480 assertEquals(persistenceId, raftRoleChanged.getMemberId());
481 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
482 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
484 // check if the notifier got a role change from Candidate to Leader
485 raftRoleChanged = matches.get(2);
486 assertEquals(persistenceId, raftRoleChanged.getMemberId());
487 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
488 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
490 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
491 notifierActor, LeaderStateChanged.class);
493 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
494 assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
496 notifierActor.underlyingActor().clear();
498 MockRaftActor raftActor = raftActorRef.underlyingActor();
499 final String newLeaderId = "new-leader";
500 final short newLeaderVersion = 6;
501 Follower follower = new Follower(raftActor.getRaftActorContext()) {
503 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
504 leaderId = newLeaderId;
505 setLeaderPayloadVersion(newLeaderVersion);
510 raftActor.newBehavior(follower);
512 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
513 assertEquals(persistenceId, leaderStateChange.getMemberId());
514 assertEquals(null, leaderStateChange.getLeaderId());
516 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
517 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
518 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
520 notifierActor.underlyingActor().clear();
522 raftActor.handleCommand("any");
524 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
525 assertEquals(persistenceId, leaderStateChange.getMemberId());
526 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
527 assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
529 notifierActor.underlyingActor().clear();
531 raftActor.handleCommand("any");
533 Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
534 leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
535 assertNull(leaderStateChange);
540 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
541 new JavaTestKit(getSystem()) {{
542 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
543 MessageCollectorActor.waitUntilReady(notifierActor);
545 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
546 long heartBeatInterval = 100;
547 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
548 config.setElectionTimeoutFactor(1);
550 String persistenceId = factory.generateActorId("notifier-");
552 factory.createActor(MockRaftActor.props(persistenceId,
553 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
555 List<RoleChanged> matches = null;
556 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
557 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
558 assertNotNull(matches);
559 if(matches.size() == 3) {
562 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
565 assertNotNull(matches);
566 assertEquals(2, matches.size());
568 // check if the notifier got a role change from null to Follower
569 RoleChanged raftRoleChanged = matches.get(0);
570 assertEquals(persistenceId, raftRoleChanged.getMemberId());
571 assertNull(raftRoleChanged.getOldRole());
572 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
574 // check if the notifier got a role change from Follower to Candidate
575 raftRoleChanged = matches.get(1);
576 assertEquals(persistenceId, raftRoleChanged.getMemberId());
577 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
578 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
584 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
585 new JavaTestKit(getSystem()) {
587 String persistenceId = factory.generateActorId("leader-");
588 String follower1Id = factory.generateActorId("follower-");
590 ActorRef followerActor1 =
591 factory.createActor(Props.create(MessageCollectorActor.class));
593 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
594 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
595 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
597 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
599 Map<String, String> peerAddresses = new HashMap<>();
600 peerAddresses.put(follower1Id, followerActor1.path().toString());
602 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
603 MockRaftActor.props(persistenceId, peerAddresses,
604 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
606 MockRaftActor leaderActor = mockActorRef.underlyingActor();
608 leaderActor.getRaftActorContext().setCommitIndex(4);
609 leaderActor.getRaftActorContext().setLastApplied(4);
610 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
612 leaderActor.waitForInitializeBehaviorComplete();
614 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
616 Leader leader = new Leader(leaderActor.getRaftActorContext());
617 leaderActor.setCurrentBehavior(leader);
618 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
620 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
621 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
623 assertEquals(8, leaderActor.getReplicatedLog().size());
625 leaderActor.getRaftActorContext().getSnapshotManager()
626 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
627 new MockRaftActorContext.MockPayload("x")), 4);
629 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
631 assertEquals(8, leaderActor.getReplicatedLog().size());
633 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
634 //fake snapshot on index 5
635 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
637 assertEquals(8, leaderActor.getReplicatedLog().size());
639 //fake snapshot on index 6
640 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
641 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
642 assertEquals(8, leaderActor.getReplicatedLog().size());
644 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
646 assertEquals(8, leaderActor.getReplicatedLog().size());
648 ByteString snapshotBytes = fromObject(Arrays.asList(
649 new MockRaftActorContext.MockPayload("foo-0"),
650 new MockRaftActorContext.MockPayload("foo-1"),
651 new MockRaftActorContext.MockPayload("foo-2"),
652 new MockRaftActorContext.MockPayload("foo-3"),
653 new MockRaftActorContext.MockPayload("foo-4")));
655 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
656 leader, Runtime.getRuntime().totalMemory());
658 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
660 // The commit is needed to complete the snapshot creation process
661 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
663 // capture snapshot reply should remove the snapshotted entries only
664 assertEquals(3, leaderActor.getReplicatedLog().size());
665 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
667 // add another non-replicated entry
668 leaderActor.getReplicatedLog().append(
669 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
671 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
672 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
673 assertEquals(2, leaderActor.getReplicatedLog().size());
674 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
681 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
682 new JavaTestKit(getSystem()) {
684 String persistenceId = factory.generateActorId("follower-");
685 String leaderId = factory.generateActorId("leader-");
688 ActorRef leaderActor1 =
689 factory.createActor(Props.create(MessageCollectorActor.class));
691 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
692 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
693 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
695 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
697 Map<String, String> peerAddresses = new HashMap<>();
698 peerAddresses.put(leaderId, leaderActor1.path().toString());
700 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
701 MockRaftActor.props(persistenceId, peerAddresses,
702 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
704 MockRaftActor followerActor = mockActorRef.underlyingActor();
705 followerActor.getRaftActorContext().setCommitIndex(4);
706 followerActor.getRaftActorContext().setLastApplied(4);
707 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
709 followerActor.waitForInitializeBehaviorComplete();
712 Follower follower = new Follower(followerActor.getRaftActorContext());
713 followerActor.setCurrentBehavior(follower);
714 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
716 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
717 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
718 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
720 // log has indices 0-5
721 assertEquals(6, followerActor.getReplicatedLog().size());
724 followerActor.getRaftActorContext().getSnapshotManager().capture(
725 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
726 new MockRaftActorContext.MockPayload("D")), 4);
728 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
730 assertEquals(6, followerActor.getReplicatedLog().size());
732 //fake snapshot on index 6
733 List<ReplicatedLogEntry> entries =
735 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
736 new MockRaftActorContext.MockPayload("foo-6"))
738 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
739 assertEquals(7, followerActor.getReplicatedLog().size());
741 //fake snapshot on index 7
742 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
746 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
747 new MockRaftActorContext.MockPayload("foo-7"))
749 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
750 assertEquals(8, followerActor.getReplicatedLog().size());
752 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
755 ByteString snapshotBytes = fromObject(Arrays.asList(
756 new MockRaftActorContext.MockPayload("foo-0"),
757 new MockRaftActorContext.MockPayload("foo-1"),
758 new MockRaftActorContext.MockPayload("foo-2"),
759 new MockRaftActorContext.MockPayload("foo-3"),
760 new MockRaftActorContext.MockPayload("foo-4")));
761 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
762 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
764 // The commit is needed to complete the snapshot creation process
765 followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
767 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
768 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
769 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
773 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
774 new MockRaftActorContext.MockPayload("foo-7"))
776 // send an additional entry 8 with leaderCommit = 7
777 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
779 // 7 and 8, as lastapplied is 7
780 assertEquals(2, followerActor.getReplicatedLog().size());
787 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
788 new JavaTestKit(getSystem()) {
790 String persistenceId = factory.generateActorId("leader-");
791 String follower1Id = factory.generateActorId("follower-");
792 String follower2Id = factory.generateActorId("follower-");
794 ActorRef followerActor1 =
795 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
796 ActorRef followerActor2 =
797 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
799 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
800 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
801 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
803 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
805 Map<String, String> peerAddresses = new HashMap<>();
806 peerAddresses.put(follower1Id, followerActor1.path().toString());
807 peerAddresses.put(follower2Id, followerActor2.path().toString());
809 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
810 MockRaftActor.props(persistenceId, peerAddresses,
811 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
813 MockRaftActor leaderActor = mockActorRef.underlyingActor();
814 leaderActor.getRaftActorContext().setCommitIndex(9);
815 leaderActor.getRaftActorContext().setLastApplied(9);
816 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
818 leaderActor.waitForInitializeBehaviorComplete();
820 Leader leader = new Leader(leaderActor.getRaftActorContext());
821 leaderActor.setCurrentBehavior(leader);
822 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
824 // create 5 entries in the log
825 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
826 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
828 //set the snapshot index to 4 , 0 to 4 are snapshotted
829 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
830 //setting replicatedToAllIndex = 9, for the log to clear
831 leader.setReplicatedToAllIndex(9);
832 assertEquals(5, leaderActor.getReplicatedLog().size());
833 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
835 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
836 assertEquals(5, leaderActor.getReplicatedLog().size());
837 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
839 // set the 2nd follower nextIndex to 1 which has been snapshotted
840 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
841 assertEquals(5, leaderActor.getReplicatedLog().size());
842 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
844 // simulate a real snapshot
845 leaderActor.onReceiveCommand(new SendHeartBeat());
846 assertEquals(5, leaderActor.getReplicatedLog().size());
847 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
848 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
849 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
852 //reply from a slow follower does not initiate a fake snapshot
853 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
854 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
856 ByteString snapshotBytes = fromObject(Arrays.asList(
857 new MockRaftActorContext.MockPayload("foo-0"),
858 new MockRaftActorContext.MockPayload("foo-1"),
859 new MockRaftActorContext.MockPayload("foo-2"),
860 new MockRaftActorContext.MockPayload("foo-3"),
861 new MockRaftActorContext.MockPayload("foo-4")));
862 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
863 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
865 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
867 //reply from a slow follower after should not raise errors
868 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
869 assertEquals(0, leaderActor.getReplicatedLog().size());
875 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
876 new JavaTestKit(getSystem()) {{
877 String persistenceId = factory.generateActorId("leader-");
878 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
879 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
880 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
881 config.setSnapshotBatchCount(5);
883 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
885 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
887 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
888 MockRaftActor.props(persistenceId, peerAddresses,
889 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
891 MockRaftActor leaderActor = mockActorRef.underlyingActor();
892 leaderActor.getRaftActorContext().setCommitIndex(3);
893 leaderActor.getRaftActorContext().setLastApplied(3);
894 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
896 leaderActor.waitForInitializeBehaviorComplete();
897 for(int i=0;i< 4;i++) {
898 leaderActor.getReplicatedLog()
899 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
900 new MockRaftActorContext.MockPayload("A")));
903 Leader leader = new Leader(leaderActor.getRaftActorContext());
904 leaderActor.setCurrentBehavior(leader);
905 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
907 // Simulate an install snaphost to a follower.
908 leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
909 leaderActor.getReplicatedLog().last(), -1, "member1");
911 // Now send a CaptureSnapshotReply
912 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
914 // Trimming log in this scenario is a no-op
915 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
916 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
917 assertEquals(-1, leader.getReplicatedToAllIndex());
923 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
924 new JavaTestKit(getSystem()) {{
925 String persistenceId = factory.generateActorId("leader-");
926 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
927 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
928 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
929 config.setSnapshotBatchCount(5);
931 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
933 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
935 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
936 MockRaftActor.props(persistenceId, peerAddresses,
937 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
939 MockRaftActor leaderActor = mockActorRef.underlyingActor();
940 leaderActor.getRaftActorContext().setCommitIndex(3);
941 leaderActor.getRaftActorContext().setLastApplied(3);
942 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
943 leaderActor.getReplicatedLog().setSnapshotIndex(3);
945 leaderActor.waitForInitializeBehaviorComplete();
946 Leader leader = new Leader(leaderActor.getRaftActorContext());
947 leaderActor.setCurrentBehavior(leader);
948 leader.setReplicatedToAllIndex(3);
949 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
951 // Persist another entry (this will cause a CaptureSnapshot to be triggered
952 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
954 // Now send a CaptureSnapshotReply
955 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
957 // Trimming log in this scenario is a no-op
958 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
959 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
960 assertEquals(3, leader.getReplicatedToAllIndex());
966 public void testRaftActorOnRecoverySnapshot() throws Exception {
967 TEST_LOG.info("testRaftActorOnRecoverySnapshot");
969 new JavaTestKit(getSystem()) {{
970 String persistenceId = factory.generateActorId("follower-");
972 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
974 // Set the heartbeat interval high to essentially disable election otherwise the test
975 // may fail if the actor is switched to Leader
976 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
978 ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
980 // Create mock ReplicatedLogEntry
981 ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
982 new MockRaftActorContext.MockPayload("F", 1));
984 InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
986 TestActorRef<MockRaftActor> ref = factory.createTestActor(
987 MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
989 MockRaftActor mockRaftActor = ref.underlyingActor();
991 mockRaftActor.waitForRecoveryComplete();
993 mockRaftActor.waitForInitializeBehaviorComplete();
995 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1000 public void testSwitchBehavior(){
1001 String persistenceId = factory.generateActorId("leader-");
1002 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1003 config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1004 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1005 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1006 config.setSnapshotBatchCount(5);
1008 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1010 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
1012 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1013 MockRaftActor.props(persistenceId, peerAddresses,
1014 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1016 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1018 leaderActor.waitForRecoveryComplete();
1020 leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
1022 assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1023 assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
1025 leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
1027 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1028 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1030 leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
1032 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1033 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1035 leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
1037 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
1038 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1041 public static ByteString fromObject(Object snapshot) throws Exception {
1042 ByteArrayOutputStream b = null;
1043 ObjectOutputStream o = null;
1045 b = new ByteArrayOutputStream();
1046 o = new ObjectOutputStream(b);
1047 o.writeObject(snapshot);
1048 byte[] snapshotBytes = b.toByteArray();
1049 return ByteString.copyFrom(snapshotBytes);
1062 public void testUpdateConfigParam() throws Exception {
1063 DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
1064 String persistenceId = factory.generateActorId("follower-");
1065 ImmutableMap<String, String> peerAddresses =
1066 ImmutableMap.<String, String>builder().put("member1", "address").build();
1067 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1069 TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
1070 MockRaftActor.props(persistenceId, peerAddresses,
1071 Optional.<ConfigParams>of(emptyConfig), dataPersistenceProvider), persistenceId);
1072 MockRaftActor mockRaftActor = actorRef.underlyingActor();
1073 mockRaftActor.waitForInitializeBehaviorComplete();
1075 RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
1076 mockRaftActor.updateConfigParams(emptyConfig);
1077 assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1078 assertEquals("Behavior State", RaftState.Follower,
1079 mockRaftActor.getCurrentBehavior().state());
1081 DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
1082 disableConfig.setCustomRaftPolicyImplementationClass(
1083 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
1084 mockRaftActor.updateConfigParams(disableConfig);
1085 assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1086 assertEquals("Behavior State", RaftState.Follower,
1087 mockRaftActor.getCurrentBehavior().state());
1089 behavior = mockRaftActor.getCurrentBehavior();
1090 mockRaftActor.updateConfigParams(disableConfig);
1091 assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1092 assertEquals("Behavior State", RaftState.Follower,
1093 mockRaftActor.getCurrentBehavior().state());
1095 DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
1096 defaultConfig.setCustomRaftPolicyImplementationClass(
1097 "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
1098 mockRaftActor.updateConfigParams(defaultConfig);
1099 assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
1100 assertEquals("Behavior State", RaftState.Follower,
1101 mockRaftActor.getCurrentBehavior().state());
1103 behavior = mockRaftActor.getCurrentBehavior();
1104 mockRaftActor.updateConfigParams(defaultConfig);
1105 assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
1106 assertEquals("Behavior State", RaftState.Follower,
1107 mockRaftActor.getCurrentBehavior().state());
1111 public void testGetSnapshot() throws Exception {
1112 TEST_LOG.info("testGetSnapshot starting");
1114 JavaTestKit kit = new JavaTestKit(getSystem());
1116 String persistenceId = factory.generateActorId("test-actor-");
1117 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1118 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1122 InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
1123 InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
1124 new MockRaftActorContext.MockPayload("A")));
1125 InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
1126 new MockRaftActorContext.MockPayload("B")));
1127 InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
1128 InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
1129 new MockRaftActorContext.MockPayload("C")));
1131 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1132 ImmutableMap.<String, String>builder().put("member1", "address").build(), Optional.<ConfigParams>of(config)).
1133 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
1134 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
1136 mockRaftActor.waitForRecoveryComplete();
1138 // Wait for snapshot after recovery
1139 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
1141 mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
1143 raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1145 ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
1146 verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
1148 byte[] stateSnapshot = new byte[]{1,2,3};
1149 replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
1151 GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
1153 assertEquals("getId", persistenceId, reply.getId());
1154 Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1155 assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1156 assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1157 assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
1158 assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
1159 assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
1160 assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
1161 assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
1162 assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
1163 assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
1165 // Test with timeout
1167 mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
1168 reset(mockRaftActor.snapshotCohortDelegate);
1170 raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1171 Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
1172 assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
1174 mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
1176 // Test with persistence disabled.
1178 mockRaftActor.setPersistence(false);
1179 reset(mockRaftActor.snapshotCohortDelegate);
1181 raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
1182 reply = kit.expectMsgClass(GetSnapshotReply.class);
1183 verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
1185 assertEquals("getId", persistenceId, reply.getId());
1186 replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
1187 assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
1188 assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
1189 assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
1190 assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
1191 assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
1192 assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
1193 assertEquals("getState length", 0, replySnapshot.getState().length);
1194 assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
1196 TEST_LOG.info("testGetSnapshot ending");