2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyInt;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.verify;
16 import static org.mockito.Mockito.verifyNoMoreInteractions;
17 import akka.japi.Procedure;
18 import akka.persistence.RecoveryCompleted;
19 import akka.persistence.SnapshotMetadata;
20 import akka.persistence.SnapshotOffer;
21 import akka.persistence.SnapshotSelectionCriteria;
22 import com.google.common.collect.Sets;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import org.hamcrest.Description;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.ArgumentMatcher;
29 import org.mockito.InOrder;
30 import org.mockito.Matchers;
31 import org.mockito.Mock;
32 import org.mockito.Mockito;
33 import org.mockito.MockitoAnnotations;
34 import org.opendaylight.controller.cluster.DataPersistenceProvider;
35 import org.opendaylight.controller.cluster.PersistentDataProvider;
36 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
41 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
42 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Unit tests for RaftActorRecoverySupport.
49 * @author Thomas Pantelis
51 public class RaftActorRecoverySupportTest {
53 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
56 private DataPersistenceProvider mockPersistence;
59 private RaftActorBehavior mockBehavior;
62 private RaftActorRecoveryCohort mockCohort;
65 PersistentDataProvider mockPersistentProvider;
67 private RaftActorRecoverySupport support;
69 private RaftActorContext context;
70 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
71 private final String localId = "leader";
76 MockitoAnnotations.initMocks(this);
78 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
79 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
81 support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
83 doReturn(true).when(mockPersistence).isRecoveryApplicable();
85 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
88 private void sendMessageToSupport(Object message) {
89 sendMessageToSupport(message, false);
92 private void sendMessageToSupport(Object message, boolean expComplete) {
93 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
94 assertEquals("complete", expComplete, complete);
98 public void testOnReplicatedLogEntry() {
99 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
100 1, new MockRaftActorContext.MockPayload("1", 5));
102 sendMessageToSupport(logEntry);
104 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
105 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
106 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
107 assertEquals("Last applied", -1, context.getLastApplied());
108 assertEquals("Commit index", -1, context.getCommitIndex());
109 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
110 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
114 public void testOnApplyJournalEntries() {
115 configParams.setJournalRecoveryLogBatchSize(5);
117 ReplicatedLog replicatedLog = context.getReplicatedLog();
118 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119 0, new MockRaftActorContext.MockPayload("0")));
120 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121 1, new MockRaftActorContext.MockPayload("1")));
122 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123 2, new MockRaftActorContext.MockPayload("2")));
124 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125 3, new MockRaftActorContext.MockPayload("3")));
126 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127 4, new MockRaftActorContext.MockPayload("4")));
128 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
129 5, new MockRaftActorContext.MockPayload("5")));
131 sendMessageToSupport(new ApplyJournalEntries(2));
133 assertEquals("Last applied", 2, context.getLastApplied());
134 assertEquals("Commit index", 2, context.getCommitIndex());
136 sendMessageToSupport(new ApplyJournalEntries(4));
138 assertEquals("Last applied", 4, context.getLastApplied());
139 assertEquals("Last applied", 4, context.getLastApplied());
141 sendMessageToSupport(new ApplyJournalEntries(5));
143 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
144 assertEquals("Last applied", 5, context.getLastApplied());
145 assertEquals("Commit index", 5, context.getCommitIndex());
146 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
147 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
149 InOrder inOrder = Mockito.inOrder(mockCohort);
150 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
152 for(int i = 0; i < replicatedLog.size() - 1; i++) {
153 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
156 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
157 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
158 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
160 inOrder.verifyNoMoreInteractions();
164 public void testOnApplyLogEntries() {
165 ReplicatedLog replicatedLog = context.getReplicatedLog();
166 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
167 0, new MockRaftActorContext.MockPayload("0")));
169 sendMessageToSupport(new ApplyLogEntries(0));
171 assertEquals("Last applied", 0, context.getLastApplied());
172 assertEquals("Commit index", 0, context.getCommitIndex());
176 public void testOnSnapshotOffer() {
178 ReplicatedLog replicatedLog = context.getReplicatedLog();
179 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
180 1, new MockRaftActorContext.MockPayload("1")));
181 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
182 2, new MockRaftActorContext.MockPayload("2")));
183 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
184 3, new MockRaftActorContext.MockPayload("3")));
186 byte[] snapshotBytes = {1,2,3,4,5};
188 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
189 4, new MockRaftActorContext.MockPayload("4", 4));
191 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
192 5, new MockRaftActorContext.MockPayload("5", 5));
194 long lastAppliedDuringSnapshotCapture = 3;
195 long lastIndexDuringSnapshotCapture = 5;
196 long electionTerm = 2;
197 String electionVotedFor = "member-2";
199 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
200 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
202 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
203 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
205 sendMessageToSupport(snapshotOffer);
207 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
208 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
209 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
210 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
211 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
212 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
213 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
214 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
215 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
217 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
221 public void testOnRecoveryCompletedWithRemainingBatch() {
222 ReplicatedLog replicatedLog = context.getReplicatedLog();
223 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
224 0, new MockRaftActorContext.MockPayload("0")));
225 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
226 1, new MockRaftActorContext.MockPayload("1")));
228 sendMessageToSupport(new ApplyJournalEntries(1));
230 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
232 assertEquals("Last applied", 1, context.getLastApplied());
233 assertEquals("Commit index", 1, context.getCommitIndex());
235 InOrder inOrder = Mockito.inOrder(mockCohort);
236 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
238 for(int i = 0; i < replicatedLog.size(); i++) {
239 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
242 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
244 inOrder.verifyNoMoreInteractions();
248 public void testOnRecoveryCompletedWithNoRemainingBatch() {
249 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
251 verifyNoMoreInteractions(mockCohort);
255 public void testOnDeprecatedDeleteEntries() {
256 ReplicatedLog replicatedLog = context.getReplicatedLog();
257 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
258 0, new MockRaftActorContext.MockPayload("0")));
259 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
260 1, new MockRaftActorContext.MockPayload("1")));
261 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
262 2, new MockRaftActorContext.MockPayload("2")));
264 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
266 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
267 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
271 public void testOnDeleteEntries() {
272 ReplicatedLog replicatedLog = context.getReplicatedLog();
273 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
274 0, new MockRaftActorContext.MockPayload("0")));
275 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
276 1, new MockRaftActorContext.MockPayload("1")));
277 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
278 2, new MockRaftActorContext.MockPayload("2")));
280 sendMessageToSupport(new DeleteEntries(1));
282 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
283 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
287 public void testUpdateElectionTerm() {
289 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
291 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
292 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
296 public void testDeprecatedUpdateElectionTerm() {
298 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
300 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
301 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
304 @SuppressWarnings("unchecked")
306 public void testDataRecoveredWithPersistenceDisabled() {
307 doReturn(false).when(mockPersistence).isRecoveryApplicable();
308 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
310 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
312 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
313 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
315 sendMessageToSupport(snapshotOffer);
317 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
318 4, new MockRaftActorContext.MockPayload("4")));
319 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
320 5, new MockRaftActorContext.MockPayload("5")));
322 sendMessageToSupport(new ApplyJournalEntries(4));
324 sendMessageToSupport(new DeleteEntries(5));
326 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
328 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
329 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
330 assertEquals("Last applied", -1, context.getLastApplied());
331 assertEquals("Commit index", -1, context.getCommitIndex());
332 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
333 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
335 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
336 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
338 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
340 verifyNoMoreInteractions(mockCohort);
342 verify(mockPersistentProvider).deleteMessages(10L);
343 verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
344 verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
347 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
348 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
350 public boolean matches(Object argument) {
351 UpdateElectionTerm other = (UpdateElectionTerm) argument;
352 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
356 public void describeTo(Description description) {
357 description.appendValue(new UpdateElectionTerm(term, votedFor));
363 public void testNoDataRecoveredWithPersistenceDisabled() {
364 doReturn(false).when(mockPersistence).isRecoveryApplicable();
366 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
368 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
369 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
371 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
373 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
377 public void testServerConfigurationPayloadApplied() {
378 String follower1 = "follower1";
379 String follower2 = "follower2";
380 String follower3 = "follower3";
382 context.addToPeers(follower1, null, VotingState.VOTING);
383 context.addToPeers(follower2, null, VotingState.VOTING);
386 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
387 new ServerInfo(localId, true),
388 new ServerInfo(follower1, true),
389 new ServerInfo(follower2, false),
390 new ServerInfo(follower3, true)));
392 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
395 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
396 Sets.newHashSet(context.getPeerIds()));
397 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
398 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
399 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
401 sendMessageToSupport(new ApplyJournalEntries(0));
403 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
404 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
406 //remove existing follower1
407 obj = new ServerConfigurationPayload(Arrays.asList(
408 new ServerInfo(localId, true),
409 new ServerInfo("follower2", true),
410 new ServerInfo("follower3", true)));
412 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
415 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
419 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
420 doReturn(false).when(mockPersistence).isRecoveryApplicable();
422 String follower = "follower";
423 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
424 new ServerInfo(localId, true), new ServerInfo(follower, true)));
426 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
429 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));