2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.verifyNoMoreInteractions;
19 import akka.japi.Procedure;
20 import akka.persistence.RecoveryCompleted;
21 import akka.persistence.SnapshotMetadata;
22 import akka.persistence.SnapshotOffer;
23 import akka.persistence.SnapshotSelectionCriteria;
24 import com.google.common.collect.Sets;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import org.hamcrest.Description;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatcher;
31 import org.mockito.InOrder;
32 import org.mockito.Matchers;
33 import org.mockito.Mock;
34 import org.mockito.Mockito;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.PersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
42 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
43 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
44 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * Unit tests for RaftActorRecoverySupport.
51 * @author Thomas Pantelis
53 public class RaftActorRecoverySupportTest {
55 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
58 private DataPersistenceProvider mockPersistence;
61 private RaftActorBehavior mockBehavior;
64 private RaftActorRecoveryCohort mockCohort;
67 PersistentDataProvider mockPersistentProvider;
69 private RaftActorRecoverySupport support;
71 private RaftActorContext context;
72 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
73 private final String localId = "leader";
78 MockitoAnnotations.initMocks(this);
80 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
81 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
83 support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
85 doReturn(true).when(mockPersistence).isRecoveryApplicable();
87 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
90 private void sendMessageToSupport(Object message) {
91 sendMessageToSupport(message, false);
94 private void sendMessageToSupport(Object message, boolean expComplete) {
95 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
96 assertEquals("complete", expComplete, complete);
100 public void testOnReplicatedLogEntry() {
101 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
102 1, new MockRaftActorContext.MockPayload("1", 5));
104 sendMessageToSupport(logEntry);
106 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
107 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
108 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
109 assertEquals("Last applied", -1, context.getLastApplied());
110 assertEquals("Commit index", -1, context.getCommitIndex());
111 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
112 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
116 public void testOnApplyJournalEntries() {
117 configParams.setJournalRecoveryLogBatchSize(5);
119 ReplicatedLog replicatedLog = context.getReplicatedLog();
120 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121 0, new MockRaftActorContext.MockPayload("0")));
122 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123 1, new MockRaftActorContext.MockPayload("1")));
124 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125 2, new MockRaftActorContext.MockPayload("2")));
126 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127 3, new MockRaftActorContext.MockPayload("3")));
128 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
129 4, new MockRaftActorContext.MockPayload("4")));
130 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
131 5, new MockRaftActorContext.MockPayload("5")));
133 sendMessageToSupport(new ApplyJournalEntries(2));
135 assertEquals("Last applied", 2, context.getLastApplied());
136 assertEquals("Commit index", 2, context.getCommitIndex());
138 sendMessageToSupport(new ApplyJournalEntries(4));
140 assertEquals("Last applied", 4, context.getLastApplied());
141 assertEquals("Last applied", 4, context.getLastApplied());
143 sendMessageToSupport(new ApplyJournalEntries(5));
145 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
146 assertEquals("Last applied", 5, context.getLastApplied());
147 assertEquals("Commit index", 5, context.getCommitIndex());
148 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
149 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
151 InOrder inOrder = Mockito.inOrder(mockCohort);
152 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
154 for(int i = 0; i < replicatedLog.size() - 1; i++) {
155 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
158 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
159 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
160 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
162 inOrder.verifyNoMoreInteractions();
166 public void testOnApplyLogEntries() {
167 ReplicatedLog replicatedLog = context.getReplicatedLog();
168 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
169 0, new MockRaftActorContext.MockPayload("0")));
171 sendMessageToSupport(new ApplyLogEntries(0));
173 assertEquals("Last applied", 0, context.getLastApplied());
174 assertEquals("Commit index", 0, context.getCommitIndex());
178 public void testOnSnapshotOffer() {
180 ReplicatedLog replicatedLog = context.getReplicatedLog();
181 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
182 1, new MockRaftActorContext.MockPayload("1")));
183 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
184 2, new MockRaftActorContext.MockPayload("2")));
185 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
186 3, new MockRaftActorContext.MockPayload("3")));
188 byte[] snapshotBytes = {1,2,3,4,5};
190 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
191 4, new MockRaftActorContext.MockPayload("4", 4));
193 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
194 5, new MockRaftActorContext.MockPayload("5", 5));
196 long lastAppliedDuringSnapshotCapture = 3;
197 long lastIndexDuringSnapshotCapture = 5;
198 long electionTerm = 2;
199 String electionVotedFor = "member-2";
201 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
202 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
204 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
205 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
207 sendMessageToSupport(snapshotOffer);
209 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
210 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
211 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
212 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
213 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
214 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
215 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
216 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
217 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
218 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
220 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
224 public void testOnRecoveryCompletedWithRemainingBatch() {
225 ReplicatedLog replicatedLog = context.getReplicatedLog();
226 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
227 0, new MockRaftActorContext.MockPayload("0")));
228 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
229 1, new MockRaftActorContext.MockPayload("1")));
231 sendMessageToSupport(new ApplyJournalEntries(1));
233 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
235 assertEquals("Last applied", 1, context.getLastApplied());
236 assertEquals("Commit index", 1, context.getCommitIndex());
238 InOrder inOrder = Mockito.inOrder(mockCohort);
239 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
241 for(int i = 0; i < replicatedLog.size(); i++) {
242 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
245 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
246 inOrder.verify(mockCohort).getRestoreFromSnapshot();
247 inOrder.verifyNoMoreInteractions();
251 public void testOnRecoveryCompletedWithNoRemainingBatch() {
252 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
254 verify(mockCohort).getRestoreFromSnapshot();
255 verifyNoMoreInteractions(mockCohort);
259 public void testOnDeprecatedDeleteEntries() {
260 ReplicatedLog replicatedLog = context.getReplicatedLog();
261 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
262 0, new MockRaftActorContext.MockPayload("0")));
263 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
264 1, new MockRaftActorContext.MockPayload("1")));
265 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
266 2, new MockRaftActorContext.MockPayload("2")));
268 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
270 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
271 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
275 public void testOnDeleteEntries() {
276 ReplicatedLog replicatedLog = context.getReplicatedLog();
277 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
278 0, new MockRaftActorContext.MockPayload("0")));
279 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
280 1, new MockRaftActorContext.MockPayload("1")));
281 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
282 2, new MockRaftActorContext.MockPayload("2")));
284 sendMessageToSupport(new DeleteEntries(1));
286 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
287 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
291 public void testUpdateElectionTerm() {
293 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
295 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
296 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
300 public void testDeprecatedUpdateElectionTerm() {
302 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
304 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
305 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
308 @SuppressWarnings("unchecked")
310 public void testDataRecoveredWithPersistenceDisabled() {
311 doReturn(false).when(mockPersistence).isRecoveryApplicable();
312 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
314 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
316 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
317 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
319 sendMessageToSupport(snapshotOffer);
321 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
322 4, new MockRaftActorContext.MockPayload("4")));
323 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
324 5, new MockRaftActorContext.MockPayload("5")));
326 sendMessageToSupport(new ApplyJournalEntries(4));
328 sendMessageToSupport(new DeleteEntries(5));
330 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
332 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
333 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
334 assertEquals("Last applied", -1, context.getLastApplied());
335 assertEquals("Commit index", -1, context.getCommitIndex());
336 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
337 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
339 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
340 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
342 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
344 verify(mockCohort).getRestoreFromSnapshot();
345 verifyNoMoreInteractions(mockCohort);
347 verify(mockPersistentProvider).deleteMessages(10L);
348 verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
349 verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
352 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
353 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
355 public boolean matches(Object argument) {
356 UpdateElectionTerm other = (UpdateElectionTerm) argument;
357 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
361 public void describeTo(Description description) {
362 description.appendValue(new UpdateElectionTerm(term, votedFor));
368 public void testNoDataRecoveredWithPersistenceDisabled() {
369 doReturn(false).when(mockPersistence).isRecoveryApplicable();
371 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
373 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
374 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
376 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
378 verify(mockCohort).getRestoreFromSnapshot();
379 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
383 public void testServerConfigurationPayloadApplied() {
384 String follower1 = "follower1";
385 String follower2 = "follower2";
386 String follower3 = "follower3";
388 context.addToPeers(follower1, null, VotingState.VOTING);
389 context.addToPeers(follower2, null, VotingState.VOTING);
392 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
393 new ServerInfo(localId, true),
394 new ServerInfo(follower1, true),
395 new ServerInfo(follower2, false),
396 new ServerInfo(follower3, true)));
398 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
401 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
402 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
403 Sets.newHashSet(context.getPeerIds()));
404 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
405 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
406 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
408 sendMessageToSupport(new ApplyJournalEntries(0));
410 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
411 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
413 //remove existing follower1
414 obj = new ServerConfigurationPayload(Arrays.asList(
415 new ServerInfo(localId, true),
416 new ServerInfo("follower2", true),
417 new ServerInfo("follower3", true)));
419 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
422 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
423 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
427 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
428 doReturn(false).when(mockPersistence).isRecoveryApplicable();
430 String follower = "follower";
431 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
432 new ServerInfo(localId, true), new ServerInfo(follower, true)));
434 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
437 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
441 public void testOnSnapshotOfferWithServerConfiguration() {
442 long electionTerm = 2;
443 String electionVotedFor = "member-2";
444 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
445 new ServerInfo(localId, true),
446 new ServerInfo("follower1", true),
447 new ServerInfo("follower2", true)));
449 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
450 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
452 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
453 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
455 sendMessageToSupport(snapshotOffer);
457 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
458 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
459 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
460 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
461 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
462 Sets.newHashSet(context.getPeerIds()));