2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.anyInt;
15 import static org.mockito.ArgumentMatchers.anyObject;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatchers;
31 import org.mockito.InOrder;
32 import org.mockito.Mock;
33 import org.mockito.Mockito;
34 import org.mockito.MockitoAnnotations;
35 import org.opendaylight.controller.cluster.DataPersistenceProvider;
36 import org.opendaylight.controller.cluster.PersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
39 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
41 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
42 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
43 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
45 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
46 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
51 * Unit tests for RaftActorRecoverySupport.
53 * @author Thomas Pantelis
55 public class RaftActorRecoverySupportTest {
57 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
60 private DataPersistenceProvider mockPersistence;
64 private RaftActorRecoveryCohort mockCohort;
67 private RaftActorSnapshotCohort mockSnapshotCohort;
70 PersistentDataProvider mockPersistentProvider;
72 private RaftActorRecoverySupport support;
74 private RaftActorContext context;
75 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
76 private final String localId = "leader";
81 MockitoAnnotations.initMocks(this);
83 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
84 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
85 mockPersistence, applyState -> { }, LOG);
87 support = new RaftActorRecoverySupport(context, mockCohort);
89 doReturn(true).when(mockPersistence).isRecoveryApplicable();
91 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
94 private void sendMessageToSupport(final Object message) {
95 sendMessageToSupport(message, false);
98 private void sendMessageToSupport(final Object message, final boolean expComplete) {
99 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
100 assertEquals("complete", expComplete, complete);
104 public void testOnReplicatedLogEntry() {
105 ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
107 sendMessageToSupport(logEntry);
109 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
110 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
111 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
112 assertEquals("Last applied", -1, context.getLastApplied());
113 assertEquals("Commit index", -1, context.getCommitIndex());
114 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
115 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
119 public void testOnApplyJournalEntries() {
120 configParams.setJournalRecoveryLogBatchSize(5);
122 ReplicatedLog replicatedLog = context.getReplicatedLog();
123 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
124 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
125 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
126 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
127 replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
128 replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
130 sendMessageToSupport(new ApplyJournalEntries(2));
132 assertEquals("Last applied", 2, context.getLastApplied());
133 assertEquals("Commit index", 2, context.getCommitIndex());
135 sendMessageToSupport(new ApplyJournalEntries(4));
137 assertEquals("Last applied", 4, context.getLastApplied());
138 assertEquals("Last applied", 4, context.getLastApplied());
140 sendMessageToSupport(new ApplyJournalEntries(5));
142 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
143 assertEquals("Last applied", 5, context.getLastApplied());
144 assertEquals("Commit index", 5, context.getCommitIndex());
145 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
146 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
148 InOrder inOrder = Mockito.inOrder(mockCohort);
149 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
151 for (int i = 0; i < replicatedLog.size() - 1; i++) {
152 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
155 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
156 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
157 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
159 inOrder.verifyNoMoreInteractions();
163 public void testOnSnapshotOffer() {
165 ReplicatedLog replicatedLog = context.getReplicatedLog();
166 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
167 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
168 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
170 ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
171 new MockRaftActorContext.MockPayload("4", 4));
173 ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
174 new MockRaftActorContext.MockPayload("5", 5));
176 long lastAppliedDuringSnapshotCapture = 3;
177 long lastIndexDuringSnapshotCapture = 5;
178 long electionTerm = 2;
179 String electionVotedFor = "member-2";
181 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
182 Snapshot snapshot = Snapshot.create(snapshotState,
183 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
184 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
186 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
187 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
189 sendMessageToSupport(snapshotOffer);
191 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
192 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
193 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
194 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
195 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
196 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
197 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
198 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
199 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
200 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
202 verify(mockCohort).applyRecoverySnapshot(snapshotState);
206 public void testOnRecoveryCompletedWithRemainingBatch() {
207 ReplicatedLog replicatedLog = context.getReplicatedLog();
208 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
209 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
211 sendMessageToSupport(new ApplyJournalEntries(1));
213 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
215 assertEquals("Last applied", 1, context.getLastApplied());
216 assertEquals("Commit index", 1, context.getCommitIndex());
218 InOrder inOrder = Mockito.inOrder(mockCohort);
219 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
221 for (int i = 0; i < replicatedLog.size(); i++) {
222 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
225 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
226 inOrder.verify(mockCohort).getRestoreFromSnapshot();
227 inOrder.verifyNoMoreInteractions();
231 public void testOnRecoveryCompletedWithNoRemainingBatch() {
232 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
234 verify(mockCohort).getRestoreFromSnapshot();
235 verifyNoMoreInteractions(mockCohort);
239 public void testOnDeleteEntries() {
240 ReplicatedLog replicatedLog = context.getReplicatedLog();
241 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
242 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
243 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
245 sendMessageToSupport(new DeleteEntries(1));
247 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
248 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
252 public void testUpdateElectionTerm() {
254 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
256 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
257 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
261 public void testDataRecoveredWithPersistenceDisabled() {
262 doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
263 doReturn(false).when(mockPersistence).isRecoveryApplicable();
264 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
266 Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
267 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
268 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
270 sendMessageToSupport(snapshotOffer);
272 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
274 sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
275 sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
277 sendMessageToSupport(new ApplyJournalEntries(4));
279 sendMessageToSupport(new DeleteEntries(5));
281 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
282 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
283 assertEquals("Last applied", -1, context.getLastApplied());
284 assertEquals("Commit index", -1, context.getCommitIndex());
285 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
286 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
288 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
289 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
291 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
293 verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
294 verify(mockCohort, never()).getRestoreFromSnapshot();
295 verifyNoMoreInteractions(mockCohort);
297 verify(mockPersistentProvider).deleteMessages(10L);
300 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
301 return ArgumentMatchers.argThat(
302 other -> term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
306 public void testNoDataRecoveredWithPersistenceDisabled() {
307 doReturn(false).when(mockPersistence).isRecoveryApplicable();
309 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
311 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
312 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
314 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
316 verify(mockCohort).getRestoreFromSnapshot();
317 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
321 public void testServerConfigurationPayloadApplied() {
322 String follower1 = "follower1";
323 String follower2 = "follower2";
324 String follower3 = "follower3";
326 context.addToPeers(follower1, null, VotingState.VOTING);
327 context.addToPeers(follower2, null, VotingState.VOTING);
330 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
331 new ServerInfo(localId, true),
332 new ServerInfo(follower1, true),
333 new ServerInfo(follower2, false),
334 new ServerInfo(follower3, true)));
336 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
339 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
340 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
341 Sets.newHashSet(context.getPeerIds()));
342 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
343 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
344 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
346 sendMessageToSupport(new ApplyJournalEntries(0));
348 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
349 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
351 //remove existing follower1
352 obj = new ServerConfigurationPayload(Arrays.asList(
353 new ServerInfo(localId, true),
354 new ServerInfo("follower2", true),
355 new ServerInfo("follower3", true)));
357 sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
360 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
361 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
365 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
366 doReturn(false).when(mockPersistence).isRecoveryApplicable();
368 String follower = "follower";
369 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
370 new ServerInfo(localId, true), new ServerInfo(follower, true)));
372 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
375 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
379 public void testOnSnapshotOfferWithServerConfiguration() {
380 long electionTerm = 2;
381 String electionVotedFor = "member-2";
382 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
383 new ServerInfo(localId, true),
384 new ServerInfo("follower1", true),
385 new ServerInfo("follower2", true)));
387 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
388 Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
389 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
391 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
392 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
394 sendMessageToSupport(snapshotOffer);
396 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
397 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
398 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
399 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
400 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
401 Sets.newHashSet(context.getPeerIds()));