2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SnapshotMetadata;
25 import akka.persistence.SnapshotOffer;
26 import com.google.common.collect.Sets;
27 import java.io.Serializable;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import org.apache.commons.lang3.SerializationUtils;
32 import org.hamcrest.Description;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.ArgumentMatcher;
36 import org.mockito.InOrder;
37 import org.mockito.Matchers;
38 import org.mockito.Mock;
39 import org.mockito.Mockito;
40 import org.mockito.MockitoAnnotations;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.PersistentDataProvider;
43 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
44 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
45 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
46 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
47 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
48 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
49 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
51 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Unit tests for RaftActorRecoverySupport.
59 * @author Thomas Pantelis
61 public class RaftActorRecoverySupportTest {
63 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
66 private DataPersistenceProvider mockPersistence;
70 private RaftActorRecoveryCohort mockCohort;
73 private RaftActorSnapshotCohort mockSnapshotCohort;
76 PersistentDataProvider mockPersistentProvider;
78 private RaftActorRecoverySupport support;
80 private RaftActorContext context;
81 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
82 private final String localId = "leader";
87 MockitoAnnotations.initMocks(this);
89 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
90 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
91 mockPersistence, applyState -> { }, LOG);
93 support = new RaftActorRecoverySupport(context, mockCohort);
95 doReturn(true).when(mockPersistence).isRecoveryApplicable();
97 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
100 private void sendMessageToSupport(Object message) {
101 sendMessageToSupport(message, false);
104 private void sendMessageToSupport(Object message, boolean expComplete) {
105 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
106 assertEquals("complete", expComplete, complete);
110 public void testOnReplicatedLogEntry() {
111 ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
113 sendMessageToSupport(logEntry);
115 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
116 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
117 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
118 assertEquals("Last applied", -1, context.getLastApplied());
119 assertEquals("Commit index", -1, context.getCommitIndex());
120 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
121 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
125 public void testOnApplyJournalEntries() {
126 configParams.setJournalRecoveryLogBatchSize(5);
128 ReplicatedLog replicatedLog = context.getReplicatedLog();
129 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
130 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
131 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
132 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
133 replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
134 replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
136 sendMessageToSupport(new ApplyJournalEntries(2));
138 assertEquals("Last applied", 2, context.getLastApplied());
139 assertEquals("Commit index", 2, context.getCommitIndex());
141 sendMessageToSupport(new ApplyJournalEntries(4));
143 assertEquals("Last applied", 4, context.getLastApplied());
144 assertEquals("Last applied", 4, context.getLastApplied());
146 sendMessageToSupport(new ApplyJournalEntries(5));
148 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
149 assertEquals("Last applied", 5, context.getLastApplied());
150 assertEquals("Commit index", 5, context.getCommitIndex());
151 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
152 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
154 InOrder inOrder = Mockito.inOrder(mockCohort);
155 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
157 for (int i = 0; i < replicatedLog.size() - 1; i++) {
158 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
161 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
162 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
163 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
165 inOrder.verifyNoMoreInteractions();
169 public void testOnSnapshotOffer() {
171 ReplicatedLog replicatedLog = context.getReplicatedLog();
172 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
173 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
174 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
176 ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
177 new MockRaftActorContext.MockPayload("4", 4));
179 ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
180 new MockRaftActorContext.MockPayload("5", 5));
182 long lastAppliedDuringSnapshotCapture = 3;
183 long lastIndexDuringSnapshotCapture = 5;
184 long electionTerm = 2;
185 String electionVotedFor = "member-2";
187 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
188 Snapshot snapshot = Snapshot.create(snapshotState,
189 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
190 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
192 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
193 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
195 sendMessageToSupport(snapshotOffer);
197 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
198 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
199 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
200 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
201 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
202 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
203 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
204 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
205 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
206 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
208 verify(mockCohort).applyRecoverySnapshot(snapshotState);
213 public void testOnSnapshotOfferWithPreCarbonSnapshot() {
215 ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
216 new MockRaftActorContext.MockPayload("4", 4));
218 ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
219 new MockRaftActorContext.MockPayload("5", 5));
221 long lastAppliedDuringSnapshotCapture = 3;
222 long lastIndexDuringSnapshotCapture = 5;
223 long electionTerm = 2;
224 String electionVotedFor = "member-2";
226 List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
227 final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
229 org.opendaylight.controller.cluster.raft.Snapshot snapshot = org.opendaylight.controller.cluster.raft.Snapshot
230 .create(SerializationUtils.serialize((Serializable) snapshotData),
231 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
232 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
234 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
235 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
237 doAnswer(invocation -> new MockSnapshotState(SerializationUtils.deserialize(
238 invocation.getArgumentAt(0, byte[].class))))
239 .when(mockCohort).deserializePreCarbonSnapshot(any(byte[].class));
241 sendMessageToSupport(snapshotOffer);
243 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
244 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
245 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
246 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
247 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
248 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
249 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
250 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
251 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
252 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
254 verify(mockCohort).applyRecoverySnapshot(snapshotState);
258 public void testOnRecoveryCompletedWithRemainingBatch() {
259 ReplicatedLog replicatedLog = context.getReplicatedLog();
260 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
261 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
263 sendMessageToSupport(new ApplyJournalEntries(1));
265 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
267 assertEquals("Last applied", 1, context.getLastApplied());
268 assertEquals("Commit index", 1, context.getCommitIndex());
270 InOrder inOrder = Mockito.inOrder(mockCohort);
271 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
273 for (int i = 0; i < replicatedLog.size(); i++) {
274 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
277 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
278 inOrder.verify(mockCohort).getRestoreFromSnapshot();
279 inOrder.verifyNoMoreInteractions();
283 public void testOnRecoveryCompletedWithNoRemainingBatch() {
284 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
286 verify(mockCohort).getRestoreFromSnapshot();
287 verifyNoMoreInteractions(mockCohort);
291 public void testOnDeleteEntries() {
292 ReplicatedLog replicatedLog = context.getReplicatedLog();
293 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
294 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
295 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
297 sendMessageToSupport(new DeleteEntries(1));
299 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
300 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
304 public void testUpdateElectionTerm() {
306 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
308 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
309 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
313 public void testDataRecoveredWithPersistenceDisabled() {
314 doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
315 doReturn(false).when(mockPersistence).isRecoveryApplicable();
316 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
318 Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
319 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
320 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
322 sendMessageToSupport(snapshotOffer);
324 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
326 sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
327 sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
329 sendMessageToSupport(new ApplyJournalEntries(4));
331 sendMessageToSupport(new DeleteEntries(5));
333 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
334 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
335 assertEquals("Last applied", -1, context.getLastApplied());
336 assertEquals("Commit index", -1, context.getCommitIndex());
337 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
338 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
340 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
341 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
343 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
345 verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
346 verify(mockCohort, never()).getRestoreFromSnapshot();
347 verifyNoMoreInteractions(mockCohort);
349 verify(mockPersistentProvider).deleteMessages(10L);
352 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
353 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
355 public boolean matches(Object argument) {
356 UpdateElectionTerm other = (UpdateElectionTerm) argument;
357 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
361 public void describeTo(Description description) {
362 description.appendValue(new UpdateElectionTerm(term, votedFor));
368 public void testNoDataRecoveredWithPersistenceDisabled() {
369 doReturn(false).when(mockPersistence).isRecoveryApplicable();
371 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
373 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
374 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
376 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
378 verify(mockCohort).getRestoreFromSnapshot();
379 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
383 public void testServerConfigurationPayloadApplied() {
384 String follower1 = "follower1";
385 String follower2 = "follower2";
386 String follower3 = "follower3";
388 context.addToPeers(follower1, null, VotingState.VOTING);
389 context.addToPeers(follower2, null, VotingState.VOTING);
392 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
393 new ServerInfo(localId, true),
394 new ServerInfo(follower1, true),
395 new ServerInfo(follower2, false),
396 new ServerInfo(follower3, true)));
398 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
401 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
402 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
403 Sets.newHashSet(context.getPeerIds()));
404 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
405 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
406 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
408 sendMessageToSupport(new ApplyJournalEntries(0));
410 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
411 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
413 //remove existing follower1
414 obj = new ServerConfigurationPayload(Arrays.asList(
415 new ServerInfo(localId, true),
416 new ServerInfo("follower2", true),
417 new ServerInfo("follower3", true)));
419 sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
422 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
423 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
427 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
428 doReturn(false).when(mockPersistence).isRecoveryApplicable();
430 String follower = "follower";
431 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
432 new ServerInfo(localId, true), new ServerInfo(follower, true)));
434 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
437 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
441 public void testOnSnapshotOfferWithServerConfiguration() {
442 long electionTerm = 2;
443 String electionVotedFor = "member-2";
444 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
445 new ServerInfo(localId, true),
446 new ServerInfo("follower1", true),
447 new ServerInfo("follower2", true)));
449 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
450 Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
451 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
453 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
454 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
456 sendMessageToSupport(snapshotOffer);
458 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
459 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
460 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
461 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
462 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
463 Sets.newHashSet(context.getPeerIds()));