2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyInt;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.verify;
15 import static org.mockito.Mockito.verifyNoMoreInteractions;
16 import akka.japi.Procedure;
17 import akka.persistence.RecoveryCompleted;
18 import akka.persistence.SnapshotMetadata;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.HashMap;
25 import org.hamcrest.Description;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.ArgumentMatcher;
29 import org.mockito.InOrder;
30 import org.mockito.Matchers;
31 import org.mockito.Mock;
32 import org.mockito.Mockito;
33 import org.mockito.MockitoAnnotations;
34 import org.opendaylight.controller.cluster.DataPersistenceProvider;
35 import org.opendaylight.controller.cluster.PersistentDataProvider;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
38 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
40 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
41 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Unit tests for RaftActorRecoverySupport.
48 * @author Thomas Pantelis
50 public class RaftActorRecoverySupportTest {
52 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
55 private DataPersistenceProvider mockPersistence;
58 private RaftActorBehavior mockBehavior;
61 private RaftActorRecoveryCohort mockCohort;
64 PersistentDataProvider mockPersistentProvider;
66 private RaftActorRecoverySupport support;
68 private RaftActorContext context;
69 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
74 MockitoAnnotations.initMocks(this);
76 context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistentProvider, "test", LOG),
77 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
79 support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
81 doReturn(true).when(mockPersistence).isRecoveryApplicable();
83 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
86 private void sendMessageToSupport(Object message) {
87 sendMessageToSupport(message, false);
90 private void sendMessageToSupport(Object message, boolean expComplete) {
91 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
92 assertEquals("complete", expComplete, complete);
96 public void testOnReplicatedLogEntry() {
97 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
98 1, new MockRaftActorContext.MockPayload("1", 5));
100 sendMessageToSupport(logEntry);
102 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
103 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
104 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
105 assertEquals("Last applied", -1, context.getLastApplied());
106 assertEquals("Commit index", -1, context.getCommitIndex());
107 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
108 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
112 public void testOnApplyJournalEntries() {
113 configParams.setJournalRecoveryLogBatchSize(5);
115 ReplicatedLog replicatedLog = context.getReplicatedLog();
116 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
117 0, new MockRaftActorContext.MockPayload("0")));
118 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119 1, new MockRaftActorContext.MockPayload("1")));
120 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121 2, new MockRaftActorContext.MockPayload("2")));
122 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123 3, new MockRaftActorContext.MockPayload("3")));
124 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125 4, new MockRaftActorContext.MockPayload("4")));
126 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127 5, new MockRaftActorContext.MockPayload("5")));
129 sendMessageToSupport(new ApplyJournalEntries(2));
131 assertEquals("Last applied", 2, context.getLastApplied());
132 assertEquals("Commit index", 2, context.getCommitIndex());
134 sendMessageToSupport(new ApplyJournalEntries(4));
136 assertEquals("Last applied", 4, context.getLastApplied());
137 assertEquals("Last applied", 4, context.getLastApplied());
139 sendMessageToSupport(new ApplyJournalEntries(5));
141 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
142 assertEquals("Last applied", 5, context.getLastApplied());
143 assertEquals("Commit index", 5, context.getCommitIndex());
144 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
145 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
147 InOrder inOrder = Mockito.inOrder(mockCohort);
148 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
150 for(int i = 0; i < replicatedLog.size() - 1; i++) {
151 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
154 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
155 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
156 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
158 inOrder.verifyNoMoreInteractions();
162 public void testOnApplyLogEntries() {
163 ReplicatedLog replicatedLog = context.getReplicatedLog();
164 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
165 0, new MockRaftActorContext.MockPayload("0")));
167 sendMessageToSupport(new ApplyLogEntries(0));
169 assertEquals("Last applied", 0, context.getLastApplied());
170 assertEquals("Commit index", 0, context.getCommitIndex());
174 public void testOnSnapshotOffer() {
176 ReplicatedLog replicatedLog = context.getReplicatedLog();
177 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
178 1, new MockRaftActorContext.MockPayload("1")));
179 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
180 2, new MockRaftActorContext.MockPayload("2")));
181 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
182 3, new MockRaftActorContext.MockPayload("3")));
184 byte[] snapshotBytes = {1,2,3,4,5};
186 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
187 4, new MockRaftActorContext.MockPayload("4", 4));
189 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
190 5, new MockRaftActorContext.MockPayload("5", 5));
192 long lastAppliedDuringSnapshotCapture = 3;
193 long lastIndexDuringSnapshotCapture = 5;
194 long electionTerm = 2;
195 String electionVotedFor = "member-2";
197 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
198 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
200 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
201 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
203 sendMessageToSupport(snapshotOffer);
205 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
206 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
207 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
208 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
209 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
210 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
211 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
212 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
213 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
215 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
219 public void testOnRecoveryCompletedWithRemainingBatch() {
220 ReplicatedLog replicatedLog = context.getReplicatedLog();
221 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
222 0, new MockRaftActorContext.MockPayload("0")));
223 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
224 1, new MockRaftActorContext.MockPayload("1")));
226 sendMessageToSupport(new ApplyJournalEntries(1));
228 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
230 assertEquals("Last applied", 1, context.getLastApplied());
231 assertEquals("Commit index", 1, context.getCommitIndex());
233 InOrder inOrder = Mockito.inOrder(mockCohort);
234 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
236 for(int i = 0; i < replicatedLog.size(); i++) {
237 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
240 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
242 inOrder.verifyNoMoreInteractions();
246 public void testOnRecoveryCompletedWithNoRemainingBatch() {
247 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
249 verifyNoMoreInteractions(mockCohort);
253 public void testOnDeprecatedDeleteEntries() {
254 ReplicatedLog replicatedLog = context.getReplicatedLog();
255 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
256 0, new MockRaftActorContext.MockPayload("0")));
257 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
258 1, new MockRaftActorContext.MockPayload("1")));
259 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
260 2, new MockRaftActorContext.MockPayload("2")));
262 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
264 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
265 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
269 public void testOnDeleteEntries() {
270 ReplicatedLog replicatedLog = context.getReplicatedLog();
271 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
272 0, new MockRaftActorContext.MockPayload("0")));
273 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
274 1, new MockRaftActorContext.MockPayload("1")));
275 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
276 2, new MockRaftActorContext.MockPayload("2")));
278 sendMessageToSupport(new DeleteEntries(1));
280 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
281 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
285 public void testUpdateElectionTerm() {
287 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
289 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
290 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
294 public void testDeprecatedUpdateElectionTerm() {
296 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
298 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
299 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
302 @SuppressWarnings("unchecked")
304 public void testDataRecoveredWithPersistenceDisabled() {
305 doReturn(false).when(mockPersistence).isRecoveryApplicable();
306 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
308 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
310 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
311 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
313 sendMessageToSupport(snapshotOffer);
315 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
316 4, new MockRaftActorContext.MockPayload("4")));
317 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
318 5, new MockRaftActorContext.MockPayload("5")));
320 sendMessageToSupport(new ApplyJournalEntries(4));
322 sendMessageToSupport(new DeleteEntries(5));
324 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
326 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
327 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
328 assertEquals("Last applied", -1, context.getLastApplied());
329 assertEquals("Commit index", -1, context.getCommitIndex());
330 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
331 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
333 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
334 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
336 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
338 verifyNoMoreInteractions(mockCohort);
340 verify(mockPersistentProvider).deleteMessages(10L);
341 verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
342 verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
345 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
346 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
348 public boolean matches(Object argument) {
349 UpdateElectionTerm other = (UpdateElectionTerm) argument;
350 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
354 public void describeTo(Description description) {
355 description.appendValue(new UpdateElectionTerm(term, votedFor));
361 public void testNoDataRecoveredWithPersistenceDisabled() {
362 doReturn(false).when(mockPersistence).isRecoveryApplicable();
364 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
366 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
367 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
369 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
371 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
375 public void testUpdatePeerIds() {
377 String leader = "Leader";
378 String follower1 = "follower1";
379 String follower2 = "follower2";
380 String follower3 = "follower3";
382 Map<String, String> peerAddresses = new HashMap<>();
384 peerAddresses.put(leader, null);
385 peerAddresses.put(follower1, null);
386 peerAddresses.put(follower2, null);
388 context.addToPeers(leader,null,VotingState.VOTING);
389 context.addToPeers(follower1,null,VotingState.VOTING);
390 context.addToPeers(follower2,null,VotingState.VOTING);
392 assertEquals("Size", 3, context.getPeers().size());
395 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
396 new ServerInfo(leader, true),
397 new ServerInfo(follower1, true),
398 new ServerInfo(follower2, true),
399 new ServerInfo(follower3, true)));
401 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
404 sendMessageToSupport(logEntry);
405 //verify size and names
406 assertEquals("Size", 4, context.getPeers().size());
407 assertEquals("New follower matched", true , context.getPeerIds().contains(follower3));
409 //remove existing follower1
410 obj = new ServerConfigurationPayload(Arrays.asList(
411 new ServerInfo("Leader", true),
412 new ServerInfo("follower2", true),
413 new ServerInfo("follower3", true)));
415 logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj);
417 sendMessageToSupport(logEntry);
418 //verify size and names
419 assertEquals("Size", 3, context.getPeers().size());
420 assertEquals("Removed follower matched", false, context.getPeerIds().contains(follower1));