2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.anyInt;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSystem;
25 import akka.actor.Props;
26 import akka.persistence.RecoveryCompleted;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.Optional;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledFuture;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Consumer;
42 import org.junit.Assert;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.ArgumentMatchers;
46 import org.mockito.InOrder;
47 import org.mockito.Mock;
48 import org.mockito.Mockito;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.PersistentDataProvider;
52 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
53 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
54 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
55 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
56 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
57 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
58 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
59 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
60 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
61 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
62 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
67 * Unit tests for RaftActorRecoverySupport.
69 * @author Thomas Pantelis
71 public class RaftActorRecoverySupportTest {
73 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
76 private DataPersistenceProvider mockPersistence;
79 private RaftActorRecoveryCohort mockCohort;
82 PersistentDataProvider mockPersistentProvider;
84 ActorRef mockActorRef;
86 ActorSystem mockActorSystem;
88 private RaftActorRecoverySupport support;
90 private RaftActorContext context;
91 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
92 private final String localId = "leader";
96 MockitoAnnotations.initMocks(this);
97 mockActorSystem = ActorSystem.create();
98 mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
99 context = new RaftActorContextImpl(mockActorRef, null, localId,
100 new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
101 Collections.<String, String>emptyMap(), configParams, mockPersistence, applyState -> {
102 }, LOG, MoreExecutors.directExecutor());
104 support = new RaftActorRecoverySupport(context, mockCohort);
106 doReturn(true).when(mockPersistence).isRecoveryApplicable();
108 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
111 private void sendMessageToSupport(final Object message) {
112 sendMessageToSupport(message, false);
115 private void sendMessageToSupport(final Object message, final boolean expComplete) {
116 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
117 assertEquals("complete", expComplete, complete);
121 public void testOnReplicatedLogEntry() {
122 ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
124 sendMessageToSupport(logEntry);
126 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
127 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
128 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
129 assertEquals("Last applied", -1, context.getLastApplied());
130 assertEquals("Commit index", -1, context.getCommitIndex());
131 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
132 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
136 public void testOnApplyJournalEntries() {
137 configParams.setJournalRecoveryLogBatchSize(5);
139 ReplicatedLog replicatedLog = context.getReplicatedLog();
140 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
141 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
142 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
143 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
144 replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
145 replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
147 sendMessageToSupport(new ApplyJournalEntries(2));
149 assertEquals("Last applied", 2, context.getLastApplied());
150 assertEquals("Commit index", 2, context.getCommitIndex());
152 sendMessageToSupport(new ApplyJournalEntries(4));
154 assertEquals("Last applied", 4, context.getLastApplied());
155 assertEquals("Last applied", 4, context.getLastApplied());
157 sendMessageToSupport(new ApplyJournalEntries(5));
159 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
160 assertEquals("Last applied", 5, context.getLastApplied());
161 assertEquals("Commit index", 5, context.getCommitIndex());
162 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
163 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
165 InOrder inOrder = Mockito.inOrder(mockCohort);
166 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
168 for (int i = 0; i < replicatedLog.size() - 1; i++) {
169 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
172 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
173 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
174 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
176 inOrder.verifyNoMoreInteractions();
180 public void testIncrementalRecovery() {
181 int recoverySnapshotInterval = 3;
182 int numberOfEntries = 5;
183 configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
184 Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
185 context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
187 ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
188 ReplicatedLog replicatedLog = context.getReplicatedLog();
190 for (int i = 0; i <= numberOfEntries; i++) {
191 replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
192 new MockRaftActorContext.MockPayload(String.valueOf(i))));
195 AtomicInteger entryCount = new AtomicInteger();
196 ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
197 int run = entryCount.getAndIncrement();
198 LOG.info("Sending entry number {}", run);
199 sendMessageToSupport(new ApplyJournalEntries(run));
200 }, 0, 1, TimeUnit.SECONDS);
202 ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
203 numberOfEntries, TimeUnit.SECONDS);
206 verify(mockSnapshotConsumer, times(1)).accept(any());
207 applyEntriesExecutor.shutdown();
208 } catch (InterruptedException | ExecutionException e) {
214 public void testOnSnapshotOffer() {
216 ReplicatedLog replicatedLog = context.getReplicatedLog();
217 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
218 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
219 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
221 ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
222 new MockRaftActorContext.MockPayload("4", 4));
224 ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
225 new MockRaftActorContext.MockPayload("5", 5));
227 long lastAppliedDuringSnapshotCapture = 3;
228 long lastIndexDuringSnapshotCapture = 5;
229 long electionTerm = 2;
230 String electionVotedFor = "member-2";
232 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
233 Snapshot snapshot = Snapshot.create(snapshotState,
234 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
235 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
237 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
238 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
240 sendMessageToSupport(snapshotOffer);
242 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
243 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
244 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
245 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
246 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
247 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
248 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
249 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
250 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
251 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
253 verify(mockCohort).applyRecoverySnapshot(snapshotState);
257 public void testOnRecoveryCompletedWithRemainingBatch() {
258 ReplicatedLog replicatedLog = context.getReplicatedLog();
259 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
260 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
262 sendMessageToSupport(new ApplyJournalEntries(1));
264 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
266 assertEquals("Last applied", 1, context.getLastApplied());
267 assertEquals("Commit index", 1, context.getCommitIndex());
269 InOrder inOrder = Mockito.inOrder(mockCohort);
270 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
272 for (int i = 0; i < replicatedLog.size(); i++) {
273 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
276 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
277 inOrder.verify(mockCohort).getRestoreFromSnapshot();
278 inOrder.verifyNoMoreInteractions();
282 public void testOnRecoveryCompletedWithNoRemainingBatch() {
283 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
285 verify(mockCohort).getRestoreFromSnapshot();
286 verifyNoMoreInteractions(mockCohort);
290 public void testOnDeleteEntries() {
291 ReplicatedLog replicatedLog = context.getReplicatedLog();
292 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
293 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
294 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
296 sendMessageToSupport(new DeleteEntries(1));
298 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
299 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
303 public void testUpdateElectionTerm() {
305 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
307 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
308 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
312 public void testDataRecoveredWithPersistenceDisabled() {
313 doNothing().when(mockCohort).applyRecoverySnapshot(any());
314 doReturn(false).when(mockPersistence).isRecoveryApplicable();
315 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
317 Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
318 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
319 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
321 sendMessageToSupport(snapshotOffer);
323 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
325 sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
326 sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
328 sendMessageToSupport(new ApplyJournalEntries(4));
330 sendMessageToSupport(new DeleteEntries(5));
332 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
333 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
334 assertEquals("Last applied", -1, context.getLastApplied());
335 assertEquals("Commit index", -1, context.getCommitIndex());
336 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
337 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
339 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
340 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
342 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
344 verify(mockCohort, never()).applyRecoverySnapshot(any());
345 verify(mockCohort, never()).getRestoreFromSnapshot();
346 verifyNoMoreInteractions(mockCohort);
348 verify(mockPersistentProvider).deleteMessages(10L);
351 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
352 return ArgumentMatchers.argThat(other ->
353 term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
357 public void testNoDataRecoveredWithPersistenceDisabled() {
358 doReturn(false).when(mockPersistence).isRecoveryApplicable();
360 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
362 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
363 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
365 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
367 verify(mockCohort).getRestoreFromSnapshot();
368 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
372 public void testServerConfigurationPayloadApplied() {
373 String follower1 = "follower1";
374 String follower2 = "follower2";
375 String follower3 = "follower3";
377 context.addToPeers(follower1, null, VotingState.VOTING);
378 context.addToPeers(follower2, null, VotingState.VOTING);
381 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
382 new ServerInfo(localId, true),
383 new ServerInfo(follower1, true),
384 new ServerInfo(follower2, false),
385 new ServerInfo(follower3, true)));
387 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
390 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
391 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
392 Sets.newHashSet(context.getPeerIds()));
393 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
394 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
395 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
397 sendMessageToSupport(new ApplyJournalEntries(0));
399 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
400 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
402 //remove existing follower1
403 obj = new ServerConfigurationPayload(Arrays.asList(
404 new ServerInfo(localId, true),
405 new ServerInfo("follower2", true),
406 new ServerInfo("follower3", true)));
408 sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
411 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
412 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
416 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
417 doReturn(false).when(mockPersistence).isRecoveryApplicable();
419 String follower = "follower";
420 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
421 new ServerInfo(localId, true), new ServerInfo(follower, true)));
423 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
426 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
430 public void testOnSnapshotOfferWithServerConfiguration() {
431 long electionTerm = 2;
432 String electionVotedFor = "member-2";
433 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
434 new ServerInfo(localId, true),
435 new ServerInfo("follower1", true),
436 new ServerInfo("follower2", true)));
438 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
439 Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
440 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
442 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
443 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
445 sendMessageToSupport(snapshotOffer);
447 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
448 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
449 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
450 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
451 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
452 Sets.newHashSet(context.getPeerIds()));