2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.hamcrest.Description;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.ArgumentMatcher;
32 import org.mockito.InOrder;
33 import org.mockito.Matchers;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.PersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
40 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
41 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
42 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
43 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
44 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
45 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
46 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
47 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
48 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * Unit tests for RaftActorRecoverySupport.
55 * @author Thomas Pantelis
57 public class RaftActorRecoverySupportTest {
59 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
62 private DataPersistenceProvider mockPersistence;
66 private RaftActorRecoveryCohort mockCohort;
69 private RaftActorSnapshotCohort mockSnapshotCohort;
72 PersistentDataProvider mockPersistentProvider;
74 private RaftActorRecoverySupport support;
76 private RaftActorContext context;
77 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
78 private final String localId = "leader";
83 MockitoAnnotations.initMocks(this);
85 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
86 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
87 mockPersistence, applyState -> { }, LOG);
89 support = new RaftActorRecoverySupport(context, mockCohort);
91 doReturn(true).when(mockPersistence).isRecoveryApplicable();
93 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
96 private void sendMessageToSupport(Object message) {
97 sendMessageToSupport(message, false);
100 private void sendMessageToSupport(Object message, boolean expComplete) {
101 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
102 assertEquals("complete", expComplete, complete);
106 public void testOnReplicatedLogEntry() {
107 ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
109 sendMessageToSupport(logEntry);
111 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
112 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
113 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
114 assertEquals("Last applied", -1, context.getLastApplied());
115 assertEquals("Commit index", -1, context.getCommitIndex());
116 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
117 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
121 public void testOnApplyJournalEntries() {
122 configParams.setJournalRecoveryLogBatchSize(5);
124 ReplicatedLog replicatedLog = context.getReplicatedLog();
125 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
126 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
127 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
128 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
129 replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
130 replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
132 sendMessageToSupport(new ApplyJournalEntries(2));
134 assertEquals("Last applied", 2, context.getLastApplied());
135 assertEquals("Commit index", 2, context.getCommitIndex());
137 sendMessageToSupport(new ApplyJournalEntries(4));
139 assertEquals("Last applied", 4, context.getLastApplied());
140 assertEquals("Last applied", 4, context.getLastApplied());
142 sendMessageToSupport(new ApplyJournalEntries(5));
144 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
145 assertEquals("Last applied", 5, context.getLastApplied());
146 assertEquals("Commit index", 5, context.getCommitIndex());
147 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
148 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
150 InOrder inOrder = Mockito.inOrder(mockCohort);
151 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
153 for (int i = 0; i < replicatedLog.size() - 1; i++) {
154 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
157 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
158 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
159 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
161 inOrder.verifyNoMoreInteractions();
165 public void testOnSnapshotOffer() {
167 ReplicatedLog replicatedLog = context.getReplicatedLog();
168 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
169 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
170 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
172 ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
173 new MockRaftActorContext.MockPayload("4", 4));
175 ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
176 new MockRaftActorContext.MockPayload("5", 5));
178 long lastAppliedDuringSnapshotCapture = 3;
179 long lastIndexDuringSnapshotCapture = 5;
180 long electionTerm = 2;
181 String electionVotedFor = "member-2";
183 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
184 Snapshot snapshot = Snapshot.create(snapshotState,
185 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
186 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
188 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
189 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
191 sendMessageToSupport(snapshotOffer);
193 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
194 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
195 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
196 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
197 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
198 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
199 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
200 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
201 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
202 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
204 verify(mockCohort).applyRecoverySnapshot(snapshotState);
208 public void testOnRecoveryCompletedWithRemainingBatch() {
209 ReplicatedLog replicatedLog = context.getReplicatedLog();
210 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
211 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
213 sendMessageToSupport(new ApplyJournalEntries(1));
215 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
217 assertEquals("Last applied", 1, context.getLastApplied());
218 assertEquals("Commit index", 1, context.getCommitIndex());
220 InOrder inOrder = Mockito.inOrder(mockCohort);
221 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
223 for (int i = 0; i < replicatedLog.size(); i++) {
224 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
227 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
228 inOrder.verify(mockCohort).getRestoreFromSnapshot();
229 inOrder.verifyNoMoreInteractions();
233 public void testOnRecoveryCompletedWithNoRemainingBatch() {
234 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
236 verify(mockCohort).getRestoreFromSnapshot();
237 verifyNoMoreInteractions(mockCohort);
241 public void testOnDeleteEntries() {
242 ReplicatedLog replicatedLog = context.getReplicatedLog();
243 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
244 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
245 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
247 sendMessageToSupport(new DeleteEntries(1));
249 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
250 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
254 public void testUpdateElectionTerm() {
256 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
258 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
259 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
263 public void testDataRecoveredWithPersistenceDisabled() {
264 doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
265 doReturn(false).when(mockPersistence).isRecoveryApplicable();
266 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
268 Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
269 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
270 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
272 sendMessageToSupport(snapshotOffer);
274 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
276 sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
277 sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
279 sendMessageToSupport(new ApplyJournalEntries(4));
281 sendMessageToSupport(new DeleteEntries(5));
283 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
284 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
285 assertEquals("Last applied", -1, context.getLastApplied());
286 assertEquals("Commit index", -1, context.getCommitIndex());
287 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
288 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
290 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
291 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
293 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
295 verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
296 verify(mockCohort, never()).getRestoreFromSnapshot();
297 verifyNoMoreInteractions(mockCohort);
299 verify(mockPersistentProvider).deleteMessages(10L);
302 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
303 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
305 public boolean matches(Object argument) {
306 UpdateElectionTerm other = (UpdateElectionTerm) argument;
307 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
311 public void describeTo(Description description) {
312 description.appendValue(new UpdateElectionTerm(term, votedFor));
318 public void testNoDataRecoveredWithPersistenceDisabled() {
319 doReturn(false).when(mockPersistence).isRecoveryApplicable();
321 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
323 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
324 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
326 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
328 verify(mockCohort).getRestoreFromSnapshot();
329 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
333 public void testServerConfigurationPayloadApplied() {
334 String follower1 = "follower1";
335 String follower2 = "follower2";
336 String follower3 = "follower3";
338 context.addToPeers(follower1, null, VotingState.VOTING);
339 context.addToPeers(follower2, null, VotingState.VOTING);
342 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
343 new ServerInfo(localId, true),
344 new ServerInfo(follower1, true),
345 new ServerInfo(follower2, false),
346 new ServerInfo(follower3, true)));
348 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
351 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
352 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
353 Sets.newHashSet(context.getPeerIds()));
354 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
355 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
356 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
358 sendMessageToSupport(new ApplyJournalEntries(0));
360 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
361 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
363 //remove existing follower1
364 obj = new ServerConfigurationPayload(Arrays.asList(
365 new ServerInfo(localId, true),
366 new ServerInfo("follower2", true),
367 new ServerInfo("follower3", true)));
369 sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
372 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
373 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
377 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
378 doReturn(false).when(mockPersistence).isRecoveryApplicable();
380 String follower = "follower";
381 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
382 new ServerInfo(localId, true), new ServerInfo(follower, true)));
384 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
387 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
391 public void testOnSnapshotOfferWithServerConfiguration() {
392 long electionTerm = 2;
393 String electionVotedFor = "member-2";
394 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
395 new ServerInfo(localId, true),
396 new ServerInfo("follower1", true),
397 new ServerInfo("follower2", true)));
399 MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
400 Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
401 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
403 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
404 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
406 sendMessageToSupport(snapshotOffer);
408 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
409 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
410 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
411 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
412 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
413 Sets.newHashSet(context.getPeerIds()));