2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.verifyNoMoreInteractions;
19 import akka.japi.Procedure;
20 import akka.persistence.RecoveryCompleted;
21 import akka.persistence.SnapshotMetadata;
22 import akka.persistence.SnapshotOffer;
23 import akka.persistence.SnapshotSelectionCriteria;
24 import com.google.common.collect.Sets;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import org.hamcrest.Description;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatcher;
31 import org.mockito.InOrder;
32 import org.mockito.Matchers;
33 import org.mockito.Mock;
34 import org.mockito.Mockito;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.PersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
42 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
43 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Unit tests for RaftActorRecoverySupport.
50 * @author Thomas Pantelis
52 public class RaftActorRecoverySupportTest {
54 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
57 private DataPersistenceProvider mockPersistence;
61 private RaftActorRecoveryCohort mockCohort;
64 PersistentDataProvider mockPersistentProvider;
66 private RaftActorRecoverySupport support;
68 private RaftActorContext context;
69 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
70 private final String localId = "leader";
75 MockitoAnnotations.initMocks(this);
77 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
78 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
80 support = new RaftActorRecoverySupport(context, mockCohort);
82 doReturn(true).when(mockPersistence).isRecoveryApplicable();
84 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
87 private void sendMessageToSupport(Object message) {
88 sendMessageToSupport(message, false);
91 private void sendMessageToSupport(Object message, boolean expComplete) {
92 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
93 assertEquals("complete", expComplete, complete);
97 public void testOnReplicatedLogEntry() {
98 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
99 1, new MockRaftActorContext.MockPayload("1", 5));
101 sendMessageToSupport(logEntry);
103 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
104 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
105 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
106 assertEquals("Last applied", -1, context.getLastApplied());
107 assertEquals("Commit index", -1, context.getCommitIndex());
108 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
109 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
113 public void testOnApplyJournalEntries() {
114 configParams.setJournalRecoveryLogBatchSize(5);
116 ReplicatedLog replicatedLog = context.getReplicatedLog();
117 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
118 0, new MockRaftActorContext.MockPayload("0")));
119 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
120 1, new MockRaftActorContext.MockPayload("1")));
121 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
122 2, new MockRaftActorContext.MockPayload("2")));
123 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
124 3, new MockRaftActorContext.MockPayload("3")));
125 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
126 4, new MockRaftActorContext.MockPayload("4")));
127 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
128 5, new MockRaftActorContext.MockPayload("5")));
130 sendMessageToSupport(new ApplyJournalEntries(2));
132 assertEquals("Last applied", 2, context.getLastApplied());
133 assertEquals("Commit index", 2, context.getCommitIndex());
135 sendMessageToSupport(new ApplyJournalEntries(4));
137 assertEquals("Last applied", 4, context.getLastApplied());
138 assertEquals("Last applied", 4, context.getLastApplied());
140 sendMessageToSupport(new ApplyJournalEntries(5));
142 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
143 assertEquals("Last applied", 5, context.getLastApplied());
144 assertEquals("Commit index", 5, context.getCommitIndex());
145 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
146 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
148 InOrder inOrder = Mockito.inOrder(mockCohort);
149 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
151 for(int i = 0; i < replicatedLog.size() - 1; i++) {
152 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
155 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
156 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
157 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
159 inOrder.verifyNoMoreInteractions();
163 public void testOnApplyLogEntries() {
164 ReplicatedLog replicatedLog = context.getReplicatedLog();
165 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
166 0, new MockRaftActorContext.MockPayload("0")));
168 sendMessageToSupport(new ApplyLogEntries(0));
170 assertEquals("Last applied", 0, context.getLastApplied());
171 assertEquals("Commit index", 0, context.getCommitIndex());
175 public void testOnSnapshotOffer() {
177 ReplicatedLog replicatedLog = context.getReplicatedLog();
178 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
179 1, new MockRaftActorContext.MockPayload("1")));
180 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
181 2, new MockRaftActorContext.MockPayload("2")));
182 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
183 3, new MockRaftActorContext.MockPayload("3")));
185 byte[] snapshotBytes = {1,2,3,4,5};
187 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
188 4, new MockRaftActorContext.MockPayload("4", 4));
190 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
191 5, new MockRaftActorContext.MockPayload("5", 5));
193 long lastAppliedDuringSnapshotCapture = 3;
194 long lastIndexDuringSnapshotCapture = 5;
195 long electionTerm = 2;
196 String electionVotedFor = "member-2";
198 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
199 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
201 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
202 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
204 sendMessageToSupport(snapshotOffer);
206 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
207 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
208 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
209 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
210 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
211 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
212 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
213 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
214 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
215 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
217 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
221 public void testOnRecoveryCompletedWithRemainingBatch() {
222 ReplicatedLog replicatedLog = context.getReplicatedLog();
223 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
224 0, new MockRaftActorContext.MockPayload("0")));
225 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
226 1, new MockRaftActorContext.MockPayload("1")));
228 sendMessageToSupport(new ApplyJournalEntries(1));
230 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
232 assertEquals("Last applied", 1, context.getLastApplied());
233 assertEquals("Commit index", 1, context.getCommitIndex());
235 InOrder inOrder = Mockito.inOrder(mockCohort);
236 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
238 for(int i = 0; i < replicatedLog.size(); i++) {
239 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
242 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
243 inOrder.verify(mockCohort).getRestoreFromSnapshot();
244 inOrder.verifyNoMoreInteractions();
248 public void testOnRecoveryCompletedWithNoRemainingBatch() {
249 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
251 verify(mockCohort).getRestoreFromSnapshot();
252 verifyNoMoreInteractions(mockCohort);
256 public void testOnDeprecatedDeleteEntries() {
257 ReplicatedLog replicatedLog = context.getReplicatedLog();
258 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
259 0, new MockRaftActorContext.MockPayload("0")));
260 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
261 1, new MockRaftActorContext.MockPayload("1")));
262 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
263 2, new MockRaftActorContext.MockPayload("2")));
265 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
267 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
268 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
272 public void testOnDeleteEntries() {
273 ReplicatedLog replicatedLog = context.getReplicatedLog();
274 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
275 0, new MockRaftActorContext.MockPayload("0")));
276 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
277 1, new MockRaftActorContext.MockPayload("1")));
278 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
279 2, new MockRaftActorContext.MockPayload("2")));
281 sendMessageToSupport(new DeleteEntries(1));
283 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
284 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
288 public void testUpdateElectionTerm() {
290 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
292 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
293 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
297 public void testDeprecatedUpdateElectionTerm() {
299 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
301 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
302 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
305 @SuppressWarnings("unchecked")
307 public void testDataRecoveredWithPersistenceDisabled() {
308 doReturn(false).when(mockPersistence).isRecoveryApplicable();
309 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
311 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
313 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
314 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
316 sendMessageToSupport(snapshotOffer);
318 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
319 4, new MockRaftActorContext.MockPayload("4")));
320 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
321 5, new MockRaftActorContext.MockPayload("5")));
323 sendMessageToSupport(new ApplyJournalEntries(4));
325 sendMessageToSupport(new DeleteEntries(5));
327 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
329 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
330 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
331 assertEquals("Last applied", -1, context.getLastApplied());
332 assertEquals("Commit index", -1, context.getCommitIndex());
333 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
334 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
336 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
337 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
339 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
341 verify(mockCohort).getRestoreFromSnapshot();
342 verifyNoMoreInteractions(mockCohort);
344 verify(mockPersistentProvider).deleteMessages(10L);
345 verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
346 verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
349 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
350 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
352 public boolean matches(Object argument) {
353 UpdateElectionTerm other = (UpdateElectionTerm) argument;
354 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
358 public void describeTo(Description description) {
359 description.appendValue(new UpdateElectionTerm(term, votedFor));
365 public void testNoDataRecoveredWithPersistenceDisabled() {
366 doReturn(false).when(mockPersistence).isRecoveryApplicable();
368 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
370 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
371 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
373 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
375 verify(mockCohort).getRestoreFromSnapshot();
376 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
380 public void testServerConfigurationPayloadApplied() {
381 String follower1 = "follower1";
382 String follower2 = "follower2";
383 String follower3 = "follower3";
385 context.addToPeers(follower1, null, VotingState.VOTING);
386 context.addToPeers(follower2, null, VotingState.VOTING);
389 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
390 new ServerInfo(localId, true),
391 new ServerInfo(follower1, true),
392 new ServerInfo(follower2, false),
393 new ServerInfo(follower3, true)));
395 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
398 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
399 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
400 Sets.newHashSet(context.getPeerIds()));
401 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
402 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
403 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
405 sendMessageToSupport(new ApplyJournalEntries(0));
407 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
408 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
410 //remove existing follower1
411 obj = new ServerConfigurationPayload(Arrays.asList(
412 new ServerInfo(localId, true),
413 new ServerInfo("follower2", true),
414 new ServerInfo("follower3", true)));
416 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
419 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
420 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
424 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
425 doReturn(false).when(mockPersistence).isRecoveryApplicable();
427 String follower = "follower";
428 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
429 new ServerInfo(localId, true), new ServerInfo(follower, true)));
431 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
434 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
438 public void testOnSnapshotOfferWithServerConfiguration() {
439 long electionTerm = 2;
440 String electionVotedFor = "member-2";
441 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
442 new ServerInfo(localId, true),
443 new ServerInfo("follower1", true),
444 new ServerInfo("follower2", true)));
446 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
447 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
449 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
450 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
452 sendMessageToSupport(snapshotOffer);
454 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
455 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
456 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
457 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
458 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
459 Sets.newHashSet(context.getPeerIds()));