2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import akka.japi.Procedure;
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.SnapshotMetadata;
23 import akka.persistence.SnapshotOffer;
24 import akka.persistence.SnapshotSelectionCriteria;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.hamcrest.Description;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.ArgumentMatcher;
32 import org.mockito.InOrder;
33 import org.mockito.Matchers;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.PersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
42 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
43 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
44 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
45 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Unit tests for RaftActorRecoverySupport.
52 * @author Thomas Pantelis
54 public class RaftActorRecoverySupportTest {
56 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
59 private DataPersistenceProvider mockPersistence;
62 private RaftActorBehavior mockBehavior;
65 private RaftActorRecoveryCohort mockCohort;
68 PersistentDataProvider mockPersistentProvider;
70 private RaftActorRecoverySupport support;
72 private RaftActorContext context;
73 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
74 private final String localId = "leader";
79 MockitoAnnotations.initMocks(this);
81 doNothing().when(mockCohort).startLogRecoveryBatch(any(int.class));
82 doNothing().when(mockCohort).applyCurrentLogRecoveryBatch();
83 doNothing().when(mockCohort).applyRecoverySnapshot(any(byte[].class));
84 doNothing().when(mockCohort).appendRecoveredLogEntry(any(Payload.class));
85 doReturn(null).when(mockCohort).getRestoreFromSnapshot();
86 doReturn(true).when(mockPersistence).isRecoveryApplicable();
87 doNothing().when(mockPersistence).deleteMessages(any(long.class));
88 doNothing().when(mockPersistentProvider).deleteMessages(any(long.class));
89 doNothing().when(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
90 doNothing().when(mockPersistentProvider).persist(any(Object.class), any(Procedure.class));
92 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
93 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
94 support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
95 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
98 private void sendMessageToSupport(Object message) {
99 sendMessageToSupport(message, false);
102 private void sendMessageToSupport(Object message, boolean expComplete) {
103 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
104 assertEquals("complete", expComplete, complete);
108 public void testOnReplicatedLogEntry() {
109 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
110 1, new MockRaftActorContext.MockPayload("1", 5));
112 sendMessageToSupport(logEntry);
114 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
115 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
116 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
117 assertEquals("Last applied", -1, context.getLastApplied());
118 assertEquals("Commit index", -1, context.getCommitIndex());
119 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
120 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
124 public void testOnApplyJournalEntries() {
125 configParams.setJournalRecoveryLogBatchSize(5);
127 ReplicatedLog replicatedLog = context.getReplicatedLog();
128 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
129 0, new MockRaftActorContext.MockPayload("0")));
130 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
131 1, new MockRaftActorContext.MockPayload("1")));
132 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
133 2, new MockRaftActorContext.MockPayload("2")));
134 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
135 3, new MockRaftActorContext.MockPayload("3")));
136 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
137 4, new MockRaftActorContext.MockPayload("4")));
138 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
139 5, new MockRaftActorContext.MockPayload("5")));
141 sendMessageToSupport(new ApplyJournalEntries(2));
143 assertEquals("Last applied", 2, context.getLastApplied());
144 assertEquals("Commit index", 2, context.getCommitIndex());
146 sendMessageToSupport(new ApplyJournalEntries(4));
148 assertEquals("Last applied", 4, context.getLastApplied());
149 assertEquals("Last applied", 4, context.getLastApplied());
151 sendMessageToSupport(new ApplyJournalEntries(5));
153 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
154 assertEquals("Last applied", 5, context.getLastApplied());
155 assertEquals("Commit index", 5, context.getCommitIndex());
156 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
157 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
159 InOrder inOrder = Mockito.inOrder(mockCohort);
160 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
162 for(int i = 0; i < replicatedLog.size() - 1; i++) {
163 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
166 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
167 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
168 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
170 inOrder.verifyNoMoreInteractions();
174 public void testOnApplyLogEntries() {
175 ReplicatedLog replicatedLog = context.getReplicatedLog();
176 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
177 0, new MockRaftActorContext.MockPayload("0")));
179 sendMessageToSupport(new ApplyLogEntries(0));
181 assertEquals("Last applied", 0, context.getLastApplied());
182 assertEquals("Commit index", 0, context.getCommitIndex());
186 public void testOnSnapshotOffer() {
188 ReplicatedLog replicatedLog = context.getReplicatedLog();
189 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
190 1, new MockRaftActorContext.MockPayload("1")));
191 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
192 2, new MockRaftActorContext.MockPayload("2")));
193 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
194 3, new MockRaftActorContext.MockPayload("3")));
196 byte[] snapshotBytes = {1,2,3,4,5};
198 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
199 4, new MockRaftActorContext.MockPayload("4", 4));
201 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
202 5, new MockRaftActorContext.MockPayload("5", 5));
204 long lastAppliedDuringSnapshotCapture = 3;
205 long lastIndexDuringSnapshotCapture = 5;
206 long electionTerm = 2;
207 String electionVotedFor = "member-2";
209 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
210 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
212 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
213 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
215 sendMessageToSupport(snapshotOffer);
217 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
218 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
219 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
220 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
221 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
222 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
223 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
224 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
225 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
226 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
228 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
232 public void testOnRecoveryCompletedWithRemainingBatch() {
233 ReplicatedLog replicatedLog = context.getReplicatedLog();
234 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
235 0, new MockRaftActorContext.MockPayload("0")));
236 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
237 1, new MockRaftActorContext.MockPayload("1")));
239 sendMessageToSupport(new ApplyJournalEntries(1));
241 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
243 assertEquals("Last applied", 1, context.getLastApplied());
244 assertEquals("Commit index", 1, context.getCommitIndex());
246 InOrder inOrder = Mockito.inOrder(mockCohort);
247 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
249 for(int i = 0; i < replicatedLog.size(); i++) {
250 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
253 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
254 inOrder.verify(mockCohort).getRestoreFromSnapshot();
255 inOrder.verifyNoMoreInteractions();
259 public void testOnRecoveryCompletedWithNoRemainingBatch() {
260 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
262 verify(mockCohort).getRestoreFromSnapshot();
263 verifyNoMoreInteractions(mockCohort);
267 public void testOnDeprecatedDeleteEntries() {
268 ReplicatedLog replicatedLog = context.getReplicatedLog();
269 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
270 0, new MockRaftActorContext.MockPayload("0")));
271 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
272 1, new MockRaftActorContext.MockPayload("1")));
273 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
274 2, new MockRaftActorContext.MockPayload("2")));
276 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
278 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
279 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
283 public void testOnDeleteEntries() {
284 ReplicatedLog replicatedLog = context.getReplicatedLog();
285 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
286 0, new MockRaftActorContext.MockPayload("0")));
287 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
288 1, new MockRaftActorContext.MockPayload("1")));
289 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
290 2, new MockRaftActorContext.MockPayload("2")));
292 sendMessageToSupport(new DeleteEntries(1));
294 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
295 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
299 public void testUpdateElectionTerm() {
301 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
303 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
304 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
308 public void testDeprecatedUpdateElectionTerm() {
310 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
312 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
313 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
316 @SuppressWarnings("unchecked")
318 public void testDataRecoveredWithPersistenceDisabled() {
319 doReturn(false).when(mockPersistence).isRecoveryApplicable();
320 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
322 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
324 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
325 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
327 sendMessageToSupport(snapshotOffer);
329 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
330 4, new MockRaftActorContext.MockPayload("4")));
331 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
332 5, new MockRaftActorContext.MockPayload("5")));
334 sendMessageToSupport(new ApplyJournalEntries(4));
336 sendMessageToSupport(new DeleteEntries(5));
338 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
340 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
341 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
342 assertEquals("Last applied", -1, context.getLastApplied());
343 assertEquals("Commit index", -1, context.getCommitIndex());
344 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
345 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
347 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
348 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
350 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
352 verify(mockCohort).getRestoreFromSnapshot();
353 verifyNoMoreInteractions(mockCohort);
355 verify(mockPersistentProvider).deleteMessages(10L);
356 verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
357 verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
360 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
361 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
363 public boolean matches(Object argument) {
364 UpdateElectionTerm other = (UpdateElectionTerm) argument;
365 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
369 public void describeTo(Description description) {
370 description.appendValue(new UpdateElectionTerm(term, votedFor));
376 public void testNoDataRecoveredWithPersistenceDisabled() {
377 doReturn(false).when(mockPersistence).isRecoveryApplicable();
379 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
381 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
382 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
384 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
386 verify(mockCohort).getRestoreFromSnapshot();
387 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
391 public void testServerConfigurationPayloadApplied() {
392 String follower1 = "follower1";
393 String follower2 = "follower2";
394 String follower3 = "follower3";
396 context.addToPeers(follower1, null, VotingState.VOTING);
397 context.addToPeers(follower2, null, VotingState.VOTING);
400 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
401 new ServerInfo(localId, true),
402 new ServerInfo(follower1, true),
403 new ServerInfo(follower2, false),
404 new ServerInfo(follower3, true)));
406 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
409 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
410 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
411 Sets.newHashSet(context.getPeerIds()));
412 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
413 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
414 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
416 sendMessageToSupport(new ApplyJournalEntries(0));
418 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
419 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
421 //remove existing follower1
422 obj = new ServerConfigurationPayload(Arrays.asList(
423 new ServerInfo(localId, true),
424 new ServerInfo("follower2", true),
425 new ServerInfo("follower3", true)));
427 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
430 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
431 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
435 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
436 doReturn(false).when(mockPersistence).isRecoveryApplicable();
438 String follower = "follower";
439 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
440 new ServerInfo(localId, true), new ServerInfo(follower, true)));
442 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
445 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
449 public void testOnSnapshotOfferWithServerConfiguration() {
450 long electionTerm = 2;
451 String electionVotedFor = "member-2";
452 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
453 new ServerInfo(localId, true),
454 new ServerInfo("follower1", true),
455 new ServerInfo("follower2", true)));
457 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
458 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
460 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
461 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
463 sendMessageToSupport(snapshotOffer);
465 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
466 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
467 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
468 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
469 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
470 Sets.newHashSet(context.getPeerIds()));