2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.actor.Props;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SnapshotMetadata;
27 import akka.persistence.SnapshotOffer;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import java.io.OutputStream;
31 import java.util.List;
33 import java.util.Optional;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledFuture;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Consumer;
42 import org.junit.After;
43 import org.junit.Assert;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.junit.runner.RunWith;
47 import org.mockito.ArgumentMatchers;
48 import org.mockito.InOrder;
49 import org.mockito.Mock;
50 import org.mockito.Mockito;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.opendaylight.controller.cluster.DataPersistenceProvider;
53 import org.opendaylight.controller.cluster.PersistentDataProvider;
54 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
55 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
56 import org.opendaylight.controller.cluster.raft.messages.Payload;
57 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
58 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
59 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
60 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
61 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
62 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
63 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
64 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
69 * Unit tests for RaftActorRecoverySupport.
71 * @author Thomas Pantelis
73 @RunWith(MockitoJUnitRunner.StrictStubs.class)
74 public class RaftActorRecoverySupportTest {
75 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
78 private DataPersistenceProvider mockPersistence;
81 private RaftActorRecoveryCohort mockCohort;
84 PersistentDataProvider mockPersistentProvider;
86 ActorRef mockActorRef;
88 ActorSystem mockActorSystem;
90 private RaftActorRecoverySupport support;
92 private RaftActorContext context;
93 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
94 private final String localId = "leader";
98 mockActorSystem = ActorSystem.create();
99 mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
100 context = new RaftActorContextImpl(mockActorRef, null, localId,
101 new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
102 Map.of(), configParams, mockPersistence, applyState -> {
103 }, LOG, MoreExecutors.directExecutor());
105 support = new RaftActorRecoverySupport(context, mockCohort);
107 doReturn(true).when(mockPersistence).isRecoveryApplicable();
109 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
113 public void tearDown() {
114 TestKit.shutdownActorSystem(mockActorSystem);
117 private void sendMessageToSupport(final Object message) {
118 sendMessageToSupport(message, false);
121 private void sendMessageToSupport(final Object message, final boolean expComplete) {
122 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
123 assertEquals("complete", expComplete, complete);
127 public void testOnReplicatedLogEntry() {
128 ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
130 sendMessageToSupport(logEntry);
132 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
133 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
134 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
135 assertEquals("Last applied", -1, context.getLastApplied());
136 assertEquals("Commit index", -1, context.getCommitIndex());
137 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
138 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
142 public void testOnApplyJournalEntries() {
143 configParams.setJournalRecoveryLogBatchSize(5);
145 ReplicatedLog replicatedLog = context.getReplicatedLog();
146 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
147 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
148 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
149 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
150 replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
151 replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
153 sendMessageToSupport(new ApplyJournalEntries(2));
155 assertEquals("Last applied", 2, context.getLastApplied());
156 assertEquals("Commit index", 2, context.getCommitIndex());
158 sendMessageToSupport(new ApplyJournalEntries(4));
160 assertEquals("Last applied", 4, context.getLastApplied());
161 assertEquals("Last applied", 4, context.getLastApplied());
163 sendMessageToSupport(new ApplyJournalEntries(5));
165 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
166 assertEquals("Last applied", 5, context.getLastApplied());
167 assertEquals("Commit index", 5, context.getCommitIndex());
168 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
169 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
171 InOrder inOrder = Mockito.inOrder(mockCohort);
172 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
174 for (int i = 0; i < replicatedLog.size() - 1; i++) {
175 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
178 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
179 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
180 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
182 inOrder.verifyNoMoreInteractions();
186 public void testIncrementalRecovery() {
187 int recoverySnapshotInterval = 3;
188 int numberOfEntries = 5;
189 configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
190 Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
191 context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
193 ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
194 ReplicatedLog replicatedLog = context.getReplicatedLog();
196 for (int i = 0; i <= numberOfEntries; i++) {
197 replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
198 new MockRaftActorContext.MockPayload(String.valueOf(i))));
201 AtomicInteger entryCount = new AtomicInteger();
202 ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
203 int run = entryCount.getAndIncrement();
204 LOG.info("Sending entry number {}", run);
205 sendMessageToSupport(new ApplyJournalEntries(run));
206 }, 0, 1, TimeUnit.SECONDS);
208 ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
209 numberOfEntries, TimeUnit.SECONDS);
212 verify(mockSnapshotConsumer, times(1)).accept(any());
213 applyEntriesExecutor.shutdown();
214 } catch (InterruptedException | ExecutionException e) {
220 public void testOnSnapshotOffer() {
222 ReplicatedLog replicatedLog = context.getReplicatedLog();
223 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
224 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
225 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
227 ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
228 new MockRaftActorContext.MockPayload("4", 4));
230 ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
231 new MockRaftActorContext.MockPayload("5", 5));
233 long lastAppliedDuringSnapshotCapture = 3;
234 long lastIndexDuringSnapshotCapture = 5;
235 long electionTerm = 2;
236 String electionVotedFor = "member-2";
238 MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1")));
239 Snapshot snapshot = Snapshot.create(snapshotState,
240 List.of(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
241 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
243 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
244 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
246 sendMessageToSupport(snapshotOffer);
248 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
249 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
250 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
251 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
252 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
253 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
254 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
255 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
256 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
257 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
259 verify(mockCohort).applyRecoverySnapshot(snapshotState);
263 public void testOnRecoveryCompletedWithRemainingBatch() {
264 ReplicatedLog replicatedLog = context.getReplicatedLog();
265 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
266 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
268 sendMessageToSupport(new ApplyJournalEntries(1));
270 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
272 assertEquals("Last applied", 1, context.getLastApplied());
273 assertEquals("Commit index", 1, context.getCommitIndex());
275 InOrder inOrder = Mockito.inOrder(mockCohort);
276 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
278 for (int i = 0; i < replicatedLog.size(); i++) {
279 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
282 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
283 inOrder.verify(mockCohort).getRestoreFromSnapshot();
284 inOrder.verifyNoMoreInteractions();
288 public void testOnRecoveryCompletedWithNoRemainingBatch() {
289 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
291 verify(mockCohort).getRestoreFromSnapshot();
292 verifyNoMoreInteractions(mockCohort);
296 public void testOnDeleteEntries() {
297 ReplicatedLog replicatedLog = context.getReplicatedLog();
298 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
299 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
300 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
302 sendMessageToSupport(new DeleteEntries(1));
304 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
305 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
309 public void testUpdateElectionTerm() {
311 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
313 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
314 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
318 public void testDataRecoveredWithPersistenceDisabled() {
319 doReturn(false).when(mockPersistence).isRecoveryApplicable();
320 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
322 Snapshot snapshot = Snapshot.create(new MockSnapshotState(List.of(new MockPayload("1"))),
323 List.of(), 3, 1, 3, 1, -1, null, null);
324 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
326 sendMessageToSupport(snapshotOffer);
328 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
330 sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
331 sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
333 sendMessageToSupport(new ApplyJournalEntries(4));
335 sendMessageToSupport(new DeleteEntries(5));
337 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
338 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
339 assertEquals("Last applied", -1, context.getLastApplied());
340 assertEquals("Commit index", -1, context.getCommitIndex());
341 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
342 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
344 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
345 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
347 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
349 verify(mockCohort, never()).applyRecoverySnapshot(any());
350 verify(mockCohort, never()).getRestoreFromSnapshot();
351 verifyNoMoreInteractions(mockCohort);
353 verify(mockPersistentProvider).deleteMessages(10L);
356 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
357 return ArgumentMatchers.argThat(other ->
358 term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
362 public void testNoDataRecoveredWithPersistenceDisabled() {
363 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
365 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
366 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
368 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
370 verify(mockCohort).getRestoreFromSnapshot();
371 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
375 public void testServerConfigurationPayloadApplied() {
376 String follower1 = "follower1";
377 String follower2 = "follower2";
378 String follower3 = "follower3";
380 context.addToPeers(follower1, null, VotingState.VOTING);
381 context.addToPeers(follower2, null, VotingState.VOTING);
384 ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of(
385 new ServerInfo(localId, true),
386 new ServerInfo(follower1, true),
387 new ServerInfo(follower2, false),
388 new ServerInfo(follower3, true)));
390 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
393 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
394 assertEquals("New peer Ids", Set.of(follower1, follower2, follower3), Set.copyOf(context.getPeerIds()));
395 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
396 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
397 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
399 sendMessageToSupport(new ApplyJournalEntries(0));
401 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
402 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
404 //remove existing follower1
405 obj = new ServerConfigurationPayload(List.of(
406 new ServerInfo(localId, true),
407 new ServerInfo("follower2", true),
408 new ServerInfo("follower3", true)));
410 sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
413 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
414 assertEquals("New peer Ids", Set.of(follower2, follower3), Set.copyOf(context.getPeerIds()));
418 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
419 doReturn(false).when(mockPersistence).isRecoveryApplicable();
421 String follower = "follower";
422 ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of(
423 new ServerInfo(localId, true), new ServerInfo(follower, true)));
425 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
428 assertEquals("New peer Ids", Set.of(follower), Set.copyOf(context.getPeerIds()));
432 public void testOnSnapshotOfferWithServerConfiguration() {
433 long electionTerm = 2;
434 String electionVotedFor = "member-2";
435 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(List.of(
436 new ServerInfo(localId, true),
437 new ServerInfo("follower1", true),
438 new ServerInfo("follower2", true)));
440 MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1")));
441 Snapshot snapshot = Snapshot.create(snapshotState, List.of(),
442 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
444 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
445 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
447 sendMessageToSupport(snapshotOffer);
449 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
450 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
451 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
452 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
453 assertEquals("Peer List", Set.of("follower1", "follower2"), Set.copyOf(context.getPeerIds()));