2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.verifyNoMoreInteractions;
19 import akka.japi.Procedure;
20 import akka.persistence.RecoveryCompleted;
21 import akka.persistence.SnapshotMetadata;
22 import akka.persistence.SnapshotOffer;
23 import akka.persistence.SnapshotSelectionCriteria;
24 import com.google.common.collect.Sets;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import org.hamcrest.Description;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatcher;
31 import org.mockito.InOrder;
32 import org.mockito.Matchers;
33 import org.mockito.Mock;
34 import org.mockito.Mockito;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.PersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
42 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Unit tests for RaftActorRecoverySupport.
49 * @author Thomas Pantelis
51 public class RaftActorRecoverySupportTest {
53 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
56 private DataPersistenceProvider mockPersistence;
60 private RaftActorRecoveryCohort mockCohort;
63 PersistentDataProvider mockPersistentProvider;
65 private RaftActorRecoverySupport support;
67 private RaftActorContext context;
68 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
69 private final String localId = "leader";
74 MockitoAnnotations.initMocks(this);
76 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
77 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
79 support = new RaftActorRecoverySupport(context, mockCohort);
81 doReturn(true).when(mockPersistence).isRecoveryApplicable();
83 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
86 private void sendMessageToSupport(Object message) {
87 sendMessageToSupport(message, false);
90 private void sendMessageToSupport(Object message, boolean expComplete) {
91 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
92 assertEquals("complete", expComplete, complete);
96 public void testOnReplicatedLogEntry() {
97 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
98 1, new MockRaftActorContext.MockPayload("1", 5));
100 sendMessageToSupport(logEntry);
102 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
103 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
104 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
105 assertEquals("Last applied", -1, context.getLastApplied());
106 assertEquals("Commit index", -1, context.getCommitIndex());
107 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
108 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
112 public void testOnApplyJournalEntries() {
113 configParams.setJournalRecoveryLogBatchSize(5);
115 ReplicatedLog replicatedLog = context.getReplicatedLog();
116 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
117 0, new MockRaftActorContext.MockPayload("0")));
118 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119 1, new MockRaftActorContext.MockPayload("1")));
120 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121 2, new MockRaftActorContext.MockPayload("2")));
122 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123 3, new MockRaftActorContext.MockPayload("3")));
124 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125 4, new MockRaftActorContext.MockPayload("4")));
126 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127 5, new MockRaftActorContext.MockPayload("5")));
129 sendMessageToSupport(new ApplyJournalEntries(2));
131 assertEquals("Last applied", 2, context.getLastApplied());
132 assertEquals("Commit index", 2, context.getCommitIndex());
134 sendMessageToSupport(new ApplyJournalEntries(4));
136 assertEquals("Last applied", 4, context.getLastApplied());
137 assertEquals("Last applied", 4, context.getLastApplied());
139 sendMessageToSupport(new ApplyJournalEntries(5));
141 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
142 assertEquals("Last applied", 5, context.getLastApplied());
143 assertEquals("Commit index", 5, context.getCommitIndex());
144 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
145 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
147 InOrder inOrder = Mockito.inOrder(mockCohort);
148 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
150 for(int i = 0; i < replicatedLog.size() - 1; i++) {
151 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
154 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
155 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
156 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
158 inOrder.verifyNoMoreInteractions();
162 public void testOnSnapshotOffer() {
164 ReplicatedLog replicatedLog = context.getReplicatedLog();
165 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
166 1, new MockRaftActorContext.MockPayload("1")));
167 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
168 2, new MockRaftActorContext.MockPayload("2")));
169 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
170 3, new MockRaftActorContext.MockPayload("3")));
172 byte[] snapshotBytes = {1,2,3,4,5};
174 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
175 4, new MockRaftActorContext.MockPayload("4", 4));
177 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
178 5, new MockRaftActorContext.MockPayload("5", 5));
180 long lastAppliedDuringSnapshotCapture = 3;
181 long lastIndexDuringSnapshotCapture = 5;
182 long electionTerm = 2;
183 String electionVotedFor = "member-2";
185 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
186 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
188 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
189 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
191 sendMessageToSupport(snapshotOffer);
193 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
194 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
195 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
196 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
197 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
198 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
199 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
200 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
201 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
202 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
204 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
208 public void testOnRecoveryCompletedWithRemainingBatch() {
209 ReplicatedLog replicatedLog = context.getReplicatedLog();
210 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
211 0, new MockRaftActorContext.MockPayload("0")));
212 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
213 1, new MockRaftActorContext.MockPayload("1")));
215 sendMessageToSupport(new ApplyJournalEntries(1));
217 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
219 assertEquals("Last applied", 1, context.getLastApplied());
220 assertEquals("Commit index", 1, context.getCommitIndex());
222 InOrder inOrder = Mockito.inOrder(mockCohort);
223 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
225 for(int i = 0; i < replicatedLog.size(); i++) {
226 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
229 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
230 inOrder.verify(mockCohort).getRestoreFromSnapshot();
231 inOrder.verifyNoMoreInteractions();
235 public void testOnRecoveryCompletedWithNoRemainingBatch() {
236 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
238 verify(mockCohort).getRestoreFromSnapshot();
239 verifyNoMoreInteractions(mockCohort);
243 public void testOnDeprecatedDeleteEntries() {
244 ReplicatedLog replicatedLog = context.getReplicatedLog();
245 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
246 0, new MockRaftActorContext.MockPayload("0")));
247 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
248 1, new MockRaftActorContext.MockPayload("1")));
249 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
250 2, new MockRaftActorContext.MockPayload("2")));
252 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
254 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
255 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
259 public void testOnDeleteEntries() {
260 ReplicatedLog replicatedLog = context.getReplicatedLog();
261 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
262 0, new MockRaftActorContext.MockPayload("0")));
263 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
264 1, new MockRaftActorContext.MockPayload("1")));
265 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
266 2, new MockRaftActorContext.MockPayload("2")));
268 sendMessageToSupport(new DeleteEntries(1));
270 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
271 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
275 public void testUpdateElectionTerm() {
277 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
279 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
280 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
284 public void testDeprecatedUpdateElectionTerm() {
286 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
288 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
289 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
292 @SuppressWarnings("unchecked")
294 public void testDataRecoveredWithPersistenceDisabled() {
295 doReturn(false).when(mockPersistence).isRecoveryApplicable();
296 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
298 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
300 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
301 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
303 sendMessageToSupport(snapshotOffer);
305 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
306 4, new MockRaftActorContext.MockPayload("4")));
307 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
308 5, new MockRaftActorContext.MockPayload("5")));
310 sendMessageToSupport(new ApplyJournalEntries(4));
312 sendMessageToSupport(new DeleteEntries(5));
314 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
316 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
317 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
318 assertEquals("Last applied", -1, context.getLastApplied());
319 assertEquals("Commit index", -1, context.getCommitIndex());
320 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
321 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
323 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
324 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
326 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
328 verify(mockCohort).getRestoreFromSnapshot();
329 verifyNoMoreInteractions(mockCohort);
331 verify(mockPersistentProvider).deleteMessages(10L);
332 verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
333 verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
336 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
337 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
339 public boolean matches(Object argument) {
340 UpdateElectionTerm other = (UpdateElectionTerm) argument;
341 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
345 public void describeTo(Description description) {
346 description.appendValue(new UpdateElectionTerm(term, votedFor));
352 public void testNoDataRecoveredWithPersistenceDisabled() {
353 doReturn(false).when(mockPersistence).isRecoveryApplicable();
355 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
357 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
358 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
360 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
362 verify(mockCohort).getRestoreFromSnapshot();
363 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
367 public void testServerConfigurationPayloadApplied() {
368 String follower1 = "follower1";
369 String follower2 = "follower2";
370 String follower3 = "follower3";
372 context.addToPeers(follower1, null, VotingState.VOTING);
373 context.addToPeers(follower2, null, VotingState.VOTING);
376 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
377 new ServerInfo(localId, true),
378 new ServerInfo(follower1, true),
379 new ServerInfo(follower2, false),
380 new ServerInfo(follower3, true)));
382 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
385 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
386 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
387 Sets.newHashSet(context.getPeerIds()));
388 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
389 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
390 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
392 sendMessageToSupport(new ApplyJournalEntries(0));
394 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
395 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
397 //remove existing follower1
398 obj = new ServerConfigurationPayload(Arrays.asList(
399 new ServerInfo(localId, true),
400 new ServerInfo("follower2", true),
401 new ServerInfo("follower3", true)));
403 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
406 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
407 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
411 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
412 doReturn(false).when(mockPersistence).isRecoveryApplicable();
414 String follower = "follower";
415 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
416 new ServerInfo(localId, true), new ServerInfo(follower, true)));
418 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
421 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
425 public void testOnSnapshotOfferWithServerConfiguration() {
426 long electionTerm = 2;
427 String electionVotedFor = "member-2";
428 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
429 new ServerInfo(localId, true),
430 new ServerInfo("follower1", true),
431 new ServerInfo("follower2", true)));
433 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
434 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
436 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
437 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
439 sendMessageToSupport(snapshotOffer);
441 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
442 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
443 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
444 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
445 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
446 Sets.newHashSet(context.getPeerIds()));