2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyObject;
17 import static org.mockito.Matchers.eq;
18 import static org.mockito.Matchers.same;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.dispatch.Dispatchers;
27 import akka.japi.Procedure;
28 import akka.persistence.SaveSnapshotFailure;
29 import akka.persistence.SaveSnapshotSuccess;
30 import akka.persistence.SnapshotMetadata;
31 import akka.persistence.SnapshotOffer;
32 import akka.testkit.JavaTestKit;
33 import akka.testkit.TestActorRef;
34 import com.google.common.base.Optional;
35 import com.google.common.collect.ImmutableMap;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayOutputStream;
39 import java.io.ObjectOutputStream;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collections;
43 import java.util.HashMap;
44 import java.util.List;
46 import java.util.concurrent.TimeUnit;
47 import org.junit.After;
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
52 import org.opendaylight.controller.cluster.PersistentDataProvider;
53 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
54 import org.opendaylight.controller.cluster.notifications.RoleChanged;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
59 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
63 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
64 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
68 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
70 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
71 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
72 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75 import scala.concurrent.duration.FiniteDuration;
77 public class RaftActorTest extends AbstractActorTest {
79 static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
81 private TestActorFactory factory;
85 factory = new TestActorFactory(getSystem());
89 public void tearDown() throws Exception {
91 InMemoryJournal.clear();
92 InMemorySnapshotStore.clear();
96 public void testConstruction() {
97 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
101 public void testFindLeaderWhenLeaderIsSelf(){
102 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
103 kit.waitUntilLeader();
107 public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
108 TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
110 new JavaTestKit(getSystem()) {{
111 String persistenceId = factory.generateActorId("follower-");
113 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
115 // Set the heartbeat interval high to essentially disable election otherwise the test
116 // may fail if the actor is switched to Leader and the commitIndex is set to the last
118 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
120 ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
121 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
122 peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
124 watch(followerActor);
126 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
127 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
128 new MockRaftActorContext.MockPayload("E"));
129 snapshotUnappliedEntries.add(entry1);
131 int lastAppliedDuringSnapshotCapture = 3;
132 int lastIndexDuringSnapshotCapture = 4;
134 // 4 messages as part of snapshot, which are applied to state
135 ByteString snapshotBytes = fromObject(Arrays.asList(
136 new MockRaftActorContext.MockPayload("A"),
137 new MockRaftActorContext.MockPayload("B"),
138 new MockRaftActorContext.MockPayload("C"),
139 new MockRaftActorContext.MockPayload("D")));
141 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
142 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
143 lastAppliedDuringSnapshotCapture, 1);
144 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
146 // add more entries after snapshot is taken
147 List<ReplicatedLogEntry> entries = new ArrayList<>();
148 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
149 new MockRaftActorContext.MockPayload("F", 2));
150 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
151 new MockRaftActorContext.MockPayload("G", 3));
152 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
153 new MockRaftActorContext.MockPayload("H", 4));
158 int lastAppliedToState = 5;
161 InMemoryJournal.addEntry(persistenceId, 5, entry2);
162 // 2 entries are applied to state besides the 4 entries in snapshot
163 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
164 InMemoryJournal.addEntry(persistenceId, 7, entry3);
165 InMemoryJournal.addEntry(persistenceId, 8, entry4);
168 followerActor.tell(PoisonPill.getInstance(), null);
169 expectMsgClass(duration("5 seconds"), Terminated.class);
171 unwatch(followerActor);
173 //reinstate the actor
174 TestActorRef<MockRaftActor> ref = factory.createTestActor(
175 MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
177 MockRaftActor mockRaftActor = ref.underlyingActor();
179 mockRaftActor.waitForRecoveryComplete();
181 RaftActorContext context = mockRaftActor.getRaftActorContext();
182 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
183 context.getReplicatedLog().size());
184 assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
185 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
186 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
187 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
188 assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
190 mockRaftActor.waitForInitializeBehaviorComplete();
192 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
195 TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
199 public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
200 new JavaTestKit(getSystem()) {{
201 String persistenceId = factory.generateActorId("follower-");
203 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
205 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
207 TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
208 ImmutableMap.<String, String>builder().put("member1", "address").build(),
209 Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
211 MockRaftActor mockRaftActor = ref.underlyingActor();
213 mockRaftActor.waitForRecoveryComplete();
215 mockRaftActor.waitForInitializeBehaviorComplete();
217 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
222 public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
223 new JavaTestKit(getSystem()) {{
224 String persistenceId = factory.generateActorId("follower-");
225 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
226 config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
227 config.setElectionTimeoutFactor(1);
229 InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
231 TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
232 ImmutableMap.<String, String>builder().put("member1", "address").build(),
233 Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
234 withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
236 InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
237 List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
238 assertEquals("UpdateElectionTerm entries", 1, entries.size());
239 UpdateElectionTerm updateEntry = entries.get(0);
241 factory.killActor(ref, this);
243 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
244 ref = factory.createTestActor(MockRaftActor.props(persistenceId,
245 ImmutableMap.<String, String>builder().put("member1", "address").build(),
246 Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
247 withDispatcher(Dispatchers.DefaultDispatcherId()),
248 factory.generateActorId("follower-"));
250 MockRaftActor actor = ref.underlyingActor();
251 actor.waitForRecoveryComplete();
253 RaftActorContext newContext = actor.getRaftActorContext();
254 assertEquals("electionTerm", updateEntry.getCurrentTerm(),
255 newContext.getTermInformation().getCurrentTerm());
256 assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
258 entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
259 assertEquals("UpdateElectionTerm entries", 1, entries.size());
264 public void testRaftActorForwardsToRaftActorRecoverySupport() {
265 String persistenceId = factory.generateActorId("leader-");
267 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
269 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
271 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
272 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
274 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
276 // Wait for akka's recovery to complete so it doesn't interfere.
277 mockRaftActor.waitForRecoveryComplete();
279 RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
280 mockRaftActor.setRaftActorRecoverySupport(mockSupport );
282 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
283 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
284 mockRaftActor.handleRecover(snapshotOffer);
286 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
287 1, new MockRaftActorContext.MockPayload("1", 5));
288 mockRaftActor.handleRecover(logEntry);
290 ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
291 mockRaftActor.handleRecover(applyJournalEntries);
293 ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
294 mockRaftActor.handleRecover(applyLogEntries);
296 DeleteEntries deleteEntries = new DeleteEntries(1);
297 mockRaftActor.handleRecover(deleteEntries);
299 org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
300 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
301 mockRaftActor.handleRecover(deprecatedDeleteEntries);
303 UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
304 mockRaftActor.handleRecover(updateElectionTerm);
306 org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
307 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
308 mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
310 verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
311 verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
312 verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
313 verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
314 verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
315 verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
316 verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
317 verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
321 public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
322 String persistenceId = factory.generateActorId("leader-");
324 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
326 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
328 RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
330 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
331 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
333 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
335 // Wait for akka's recovery to complete so it doesn't interfere.
336 mockRaftActor.waitForRecoveryComplete();
338 ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
339 doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
340 mockRaftActor.handleCommand(applySnapshot);
342 CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
343 doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
344 mockRaftActor.handleCommand(captureSnapshot);
346 CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
347 doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
348 mockRaftActor.handleCommand(captureSnapshotReply);
350 SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
351 doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
352 mockRaftActor.handleCommand(saveSnapshotSuccess);
354 SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
355 doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
356 mockRaftActor.handleCommand(saveSnapshotFailure);
358 doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
359 mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
361 verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
362 verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
363 verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
364 verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
365 verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
366 verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
370 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
371 new JavaTestKit(getSystem()) {
373 String persistenceId = factory.generateActorId("leader-");
375 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
377 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
379 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
381 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
382 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
384 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
386 mockRaftActor.waitForInitializeBehaviorComplete();
388 mockRaftActor.waitUntilLeader();
390 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
392 verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
400 public void testApplyState() throws Exception {
402 new JavaTestKit(getSystem()) {
404 String persistenceId = factory.generateActorId("leader-");
406 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
408 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
410 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
412 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
413 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
415 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
417 mockRaftActor.waitForInitializeBehaviorComplete();
419 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
420 new MockRaftActorContext.MockPayload("F"));
422 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
424 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
431 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
432 new JavaTestKit(getSystem()) {{
433 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
434 Props.create(MessageCollectorActor.class));
435 MessageCollectorActor.waitUntilReady(notifierActor);
437 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
438 long heartBeatInterval = 100;
439 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
440 config.setElectionTimeoutFactor(20);
442 String persistenceId = factory.generateActorId("notifier-");
444 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
445 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
446 new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
448 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
451 // check if the notifier got a role change from null to Follower
452 RoleChanged raftRoleChanged = matches.get(0);
453 assertEquals(persistenceId, raftRoleChanged.getMemberId());
454 assertNull(raftRoleChanged.getOldRole());
455 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
457 // check if the notifier got a role change from Follower to Candidate
458 raftRoleChanged = matches.get(1);
459 assertEquals(persistenceId, raftRoleChanged.getMemberId());
460 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
461 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
463 // check if the notifier got a role change from Candidate to Leader
464 raftRoleChanged = matches.get(2);
465 assertEquals(persistenceId, raftRoleChanged.getMemberId());
466 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
467 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
469 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
470 notifierActor, LeaderStateChanged.class);
472 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
473 assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
475 notifierActor.underlyingActor().clear();
477 MockRaftActor raftActor = raftActorRef.underlyingActor();
478 final String newLeaderId = "new-leader";
479 final short newLeaderVersion = 6;
480 Follower follower = new Follower(raftActor.getRaftActorContext()) {
482 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
483 leaderId = newLeaderId;
484 setLeaderPayloadVersion(newLeaderVersion);
489 raftActor.newBehavior(follower);
491 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
492 assertEquals(persistenceId, leaderStateChange.getMemberId());
493 assertEquals(null, leaderStateChange.getLeaderId());
495 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
496 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
497 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
499 notifierActor.underlyingActor().clear();
501 raftActor.handleCommand("any");
503 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
504 assertEquals(persistenceId, leaderStateChange.getMemberId());
505 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
506 assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
508 notifierActor.underlyingActor().clear();
510 raftActor.handleCommand("any");
512 Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
513 leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
514 assertNull(leaderStateChange);
519 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
520 new JavaTestKit(getSystem()) {{
521 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
522 MessageCollectorActor.waitUntilReady(notifierActor);
524 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
525 long heartBeatInterval = 100;
526 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
527 config.setElectionTimeoutFactor(1);
529 String persistenceId = factory.generateActorId("notifier-");
531 factory.createActor(MockRaftActor.props(persistenceId,
532 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
534 List<RoleChanged> matches = null;
535 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
536 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
537 assertNotNull(matches);
538 if(matches.size() == 3) {
541 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
544 assertNotNull(matches);
545 assertEquals(2, matches.size());
547 // check if the notifier got a role change from null to Follower
548 RoleChanged raftRoleChanged = matches.get(0);
549 assertEquals(persistenceId, raftRoleChanged.getMemberId());
550 assertNull(raftRoleChanged.getOldRole());
551 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
553 // check if the notifier got a role change from Follower to Candidate
554 raftRoleChanged = matches.get(1);
555 assertEquals(persistenceId, raftRoleChanged.getMemberId());
556 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
557 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
563 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
564 new JavaTestKit(getSystem()) {
566 String persistenceId = factory.generateActorId("leader-");
567 String follower1Id = factory.generateActorId("follower-");
569 ActorRef followerActor1 =
570 factory.createActor(Props.create(MessageCollectorActor.class));
572 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
573 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
574 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
576 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
578 Map<String, String> peerAddresses = new HashMap<>();
579 peerAddresses.put(follower1Id, followerActor1.path().toString());
581 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
582 MockRaftActor.props(persistenceId, peerAddresses,
583 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
585 MockRaftActor leaderActor = mockActorRef.underlyingActor();
587 leaderActor.getRaftActorContext().setCommitIndex(4);
588 leaderActor.getRaftActorContext().setLastApplied(4);
589 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
591 leaderActor.waitForInitializeBehaviorComplete();
593 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
595 Leader leader = new Leader(leaderActor.getRaftActorContext());
596 leaderActor.setCurrentBehavior(leader);
597 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
599 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
600 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
602 assertEquals(8, leaderActor.getReplicatedLog().size());
604 leaderActor.getRaftActorContext().getSnapshotManager()
605 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
606 new MockRaftActorContext.MockPayload("x")), 4);
608 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
610 assertEquals(8, leaderActor.getReplicatedLog().size());
612 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
613 //fake snapshot on index 5
614 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
616 assertEquals(8, leaderActor.getReplicatedLog().size());
618 //fake snapshot on index 6
619 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
620 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
621 assertEquals(8, leaderActor.getReplicatedLog().size());
623 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
625 assertEquals(8, leaderActor.getReplicatedLog().size());
627 ByteString snapshotBytes = fromObject(Arrays.asList(
628 new MockRaftActorContext.MockPayload("foo-0"),
629 new MockRaftActorContext.MockPayload("foo-1"),
630 new MockRaftActorContext.MockPayload("foo-2"),
631 new MockRaftActorContext.MockPayload("foo-3"),
632 new MockRaftActorContext.MockPayload("foo-4")));
634 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
635 leader, Runtime.getRuntime().totalMemory());
637 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
639 // The commit is needed to complete the snapshot creation process
640 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
642 // capture snapshot reply should remove the snapshotted entries only
643 assertEquals(3, leaderActor.getReplicatedLog().size());
644 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
646 // add another non-replicated entry
647 leaderActor.getReplicatedLog().append(
648 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
650 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
651 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
652 assertEquals(2, leaderActor.getReplicatedLog().size());
653 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
660 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
661 new JavaTestKit(getSystem()) {
663 String persistenceId = factory.generateActorId("follower-");
664 String leaderId = factory.generateActorId("leader-");
667 ActorRef leaderActor1 =
668 factory.createActor(Props.create(MessageCollectorActor.class));
670 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
671 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
672 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
674 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
676 Map<String, String> peerAddresses = new HashMap<>();
677 peerAddresses.put(leaderId, leaderActor1.path().toString());
679 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
680 MockRaftActor.props(persistenceId, peerAddresses,
681 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
683 MockRaftActor followerActor = mockActorRef.underlyingActor();
684 followerActor.getRaftActorContext().setCommitIndex(4);
685 followerActor.getRaftActorContext().setLastApplied(4);
686 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
688 followerActor.waitForInitializeBehaviorComplete();
691 Follower follower = new Follower(followerActor.getRaftActorContext());
692 followerActor.setCurrentBehavior(follower);
693 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
695 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
696 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
697 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
699 // log has indices 0-5
700 assertEquals(6, followerActor.getReplicatedLog().size());
703 followerActor.getRaftActorContext().getSnapshotManager().capture(
704 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
705 new MockRaftActorContext.MockPayload("D")), 4);
707 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
709 assertEquals(6, followerActor.getReplicatedLog().size());
711 //fake snapshot on index 6
712 List<ReplicatedLogEntry> entries =
714 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
715 new MockRaftActorContext.MockPayload("foo-6"))
717 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
718 assertEquals(7, followerActor.getReplicatedLog().size());
720 //fake snapshot on index 7
721 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
725 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
726 new MockRaftActorContext.MockPayload("foo-7"))
728 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
729 assertEquals(8, followerActor.getReplicatedLog().size());
731 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
734 ByteString snapshotBytes = fromObject(Arrays.asList(
735 new MockRaftActorContext.MockPayload("foo-0"),
736 new MockRaftActorContext.MockPayload("foo-1"),
737 new MockRaftActorContext.MockPayload("foo-2"),
738 new MockRaftActorContext.MockPayload("foo-3"),
739 new MockRaftActorContext.MockPayload("foo-4")));
740 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
741 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
743 // The commit is needed to complete the snapshot creation process
744 followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
746 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
747 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
748 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
752 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
753 new MockRaftActorContext.MockPayload("foo-7"))
755 // send an additional entry 8 with leaderCommit = 7
756 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
758 // 7 and 8, as lastapplied is 7
759 assertEquals(2, followerActor.getReplicatedLog().size());
766 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
767 new JavaTestKit(getSystem()) {
769 String persistenceId = factory.generateActorId("leader-");
770 String follower1Id = factory.generateActorId("follower-");
771 String follower2Id = factory.generateActorId("follower-");
773 ActorRef followerActor1 =
774 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
775 ActorRef followerActor2 =
776 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
778 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
779 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
780 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
782 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
784 Map<String, String> peerAddresses = new HashMap<>();
785 peerAddresses.put(follower1Id, followerActor1.path().toString());
786 peerAddresses.put(follower2Id, followerActor2.path().toString());
788 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
789 MockRaftActor.props(persistenceId, peerAddresses,
790 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
792 MockRaftActor leaderActor = mockActorRef.underlyingActor();
793 leaderActor.getRaftActorContext().setCommitIndex(9);
794 leaderActor.getRaftActorContext().setLastApplied(9);
795 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
797 leaderActor.waitForInitializeBehaviorComplete();
799 Leader leader = new Leader(leaderActor.getRaftActorContext());
800 leaderActor.setCurrentBehavior(leader);
801 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
803 // create 5 entries in the log
804 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
805 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
807 //set the snapshot index to 4 , 0 to 4 are snapshotted
808 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
809 //setting replicatedToAllIndex = 9, for the log to clear
810 leader.setReplicatedToAllIndex(9);
811 assertEquals(5, leaderActor.getReplicatedLog().size());
812 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
814 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
815 assertEquals(5, leaderActor.getReplicatedLog().size());
816 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
818 // set the 2nd follower nextIndex to 1 which has been snapshotted
819 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
820 assertEquals(5, leaderActor.getReplicatedLog().size());
821 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
823 // simulate a real snapshot
824 leaderActor.onReceiveCommand(new SendHeartBeat());
825 assertEquals(5, leaderActor.getReplicatedLog().size());
826 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
827 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
828 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
831 //reply from a slow follower does not initiate a fake snapshot
832 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
833 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
835 ByteString snapshotBytes = fromObject(Arrays.asList(
836 new MockRaftActorContext.MockPayload("foo-0"),
837 new MockRaftActorContext.MockPayload("foo-1"),
838 new MockRaftActorContext.MockPayload("foo-2"),
839 new MockRaftActorContext.MockPayload("foo-3"),
840 new MockRaftActorContext.MockPayload("foo-4")));
841 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
842 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
844 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
846 //reply from a slow follower after should not raise errors
847 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
848 assertEquals(0, leaderActor.getReplicatedLog().size());
854 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
855 new JavaTestKit(getSystem()) {{
856 String persistenceId = factory.generateActorId("leader-");
857 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
858 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
859 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
860 config.setSnapshotBatchCount(5);
862 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
864 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
866 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
867 MockRaftActor.props(persistenceId, peerAddresses,
868 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
870 MockRaftActor leaderActor = mockActorRef.underlyingActor();
871 leaderActor.getRaftActorContext().setCommitIndex(3);
872 leaderActor.getRaftActorContext().setLastApplied(3);
873 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
875 leaderActor.waitForInitializeBehaviorComplete();
876 for(int i=0;i< 4;i++) {
877 leaderActor.getReplicatedLog()
878 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
879 new MockRaftActorContext.MockPayload("A")));
882 Leader leader = new Leader(leaderActor.getRaftActorContext());
883 leaderActor.setCurrentBehavior(leader);
884 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
886 // Simulate an install snaphost to a follower.
887 leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
888 leaderActor.getReplicatedLog().last(), -1, "member1");
890 // Now send a CaptureSnapshotReply
891 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
893 // Trimming log in this scenario is a no-op
894 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
895 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
896 assertEquals(-1, leader.getReplicatedToAllIndex());
902 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
903 new JavaTestKit(getSystem()) {{
904 String persistenceId = factory.generateActorId("leader-");
905 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
906 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
907 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
908 config.setSnapshotBatchCount(5);
910 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
912 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
914 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
915 MockRaftActor.props(persistenceId, peerAddresses,
916 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
918 MockRaftActor leaderActor = mockActorRef.underlyingActor();
919 leaderActor.getRaftActorContext().setCommitIndex(3);
920 leaderActor.getRaftActorContext().setLastApplied(3);
921 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
922 leaderActor.getReplicatedLog().setSnapshotIndex(3);
924 leaderActor.waitForInitializeBehaviorComplete();
925 Leader leader = new Leader(leaderActor.getRaftActorContext());
926 leaderActor.setCurrentBehavior(leader);
927 leader.setReplicatedToAllIndex(3);
928 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
930 // Persist another entry (this will cause a CaptureSnapshot to be triggered
931 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
933 // Now send a CaptureSnapshotReply
934 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
936 // Trimming log in this scenario is a no-op
937 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
938 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
939 assertEquals(3, leader.getReplicatedToAllIndex());
945 public void testSwitchBehavior(){
946 String persistenceId = factory.generateActorId("leader-");
947 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
948 config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
949 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
950 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
951 config.setSnapshotBatchCount(5);
953 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
955 Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
957 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
958 MockRaftActor.props(persistenceId, peerAddresses,
959 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
961 MockRaftActor leaderActor = mockActorRef.underlyingActor();
963 leaderActor.waitForRecoveryComplete();
965 leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
967 assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
968 assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
970 leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
972 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
973 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
975 leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
977 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
978 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
980 leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
982 assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
983 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
986 public static ByteString fromObject(Object snapshot) throws Exception {
987 ByteArrayOutputStream b = null;
988 ObjectOutputStream o = null;
990 b = new ByteArrayOutputStream();
991 o = new ObjectOutputStream(b);
992 o.writeObject(snapshot);
993 byte[] snapshotBytes = b.toByteArray();
994 return ByteString.copyFrom(snapshotBytes);