2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.AdditionalMatchers.aryEq;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyInt;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.hamcrest.Description;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.ArgumentMatcher;
32 import org.mockito.InOrder;
33 import org.mockito.Matchers;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.PersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
41 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
42 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
43 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
44 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * Unit tests for RaftActorRecoverySupport.
51 * @author Thomas Pantelis
53 public class RaftActorRecoverySupportTest {
55 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
58 private DataPersistenceProvider mockPersistence;
62 private RaftActorRecoveryCohort mockCohort;
65 PersistentDataProvider mockPersistentProvider;
67 private RaftActorRecoverySupport support;
69 private RaftActorContext context;
70 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
71 private final String localId = "leader";
76 MockitoAnnotations.initMocks(this);
78 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
79 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
81 support = new RaftActorRecoverySupport(context, mockCohort);
83 doReturn(true).when(mockPersistence).isRecoveryApplicable();
85 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
88 private void sendMessageToSupport(Object message) {
89 sendMessageToSupport(message, false);
92 private void sendMessageToSupport(Object message, boolean expComplete) {
93 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
94 assertEquals("complete", expComplete, complete);
98 public void testOnReplicatedLogEntry() {
99 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
100 1, new MockRaftActorContext.MockPayload("1", 5));
102 sendMessageToSupport(logEntry);
104 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
105 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
106 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
107 assertEquals("Last applied", -1, context.getLastApplied());
108 assertEquals("Commit index", -1, context.getCommitIndex());
109 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
110 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
114 public void testOnApplyJournalEntries() {
115 configParams.setJournalRecoveryLogBatchSize(5);
117 ReplicatedLog replicatedLog = context.getReplicatedLog();
118 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119 0, new MockRaftActorContext.MockPayload("0")));
120 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121 1, new MockRaftActorContext.MockPayload("1")));
122 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123 2, new MockRaftActorContext.MockPayload("2")));
124 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125 3, new MockRaftActorContext.MockPayload("3")));
126 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127 4, new MockRaftActorContext.MockPayload("4")));
128 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
129 5, new MockRaftActorContext.MockPayload("5")));
131 sendMessageToSupport(new ApplyJournalEntries(2));
133 assertEquals("Last applied", 2, context.getLastApplied());
134 assertEquals("Commit index", 2, context.getCommitIndex());
136 sendMessageToSupport(new ApplyJournalEntries(4));
138 assertEquals("Last applied", 4, context.getLastApplied());
139 assertEquals("Last applied", 4, context.getLastApplied());
141 sendMessageToSupport(new ApplyJournalEntries(5));
143 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
144 assertEquals("Last applied", 5, context.getLastApplied());
145 assertEquals("Commit index", 5, context.getCommitIndex());
146 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
147 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
149 InOrder inOrder = Mockito.inOrder(mockCohort);
150 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
152 for (int i = 0; i < replicatedLog.size() - 1; i++) {
153 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
156 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
157 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
158 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
160 inOrder.verifyNoMoreInteractions();
164 public void testOnSnapshotOffer() {
166 ReplicatedLog replicatedLog = context.getReplicatedLog();
167 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
168 1, new MockRaftActorContext.MockPayload("1")));
169 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
170 2, new MockRaftActorContext.MockPayload("2")));
171 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
172 3, new MockRaftActorContext.MockPayload("3")));
174 byte[] snapshotBytes = {1,2,3,4,5};
176 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
177 4, new MockRaftActorContext.MockPayload("4", 4));
179 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
180 5, new MockRaftActorContext.MockPayload("5", 5));
182 long lastAppliedDuringSnapshotCapture = 3;
183 long lastIndexDuringSnapshotCapture = 5;
184 long electionTerm = 2;
185 String electionVotedFor = "member-2";
187 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
188 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
190 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
191 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
193 sendMessageToSupport(snapshotOffer);
195 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
196 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
197 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
198 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
199 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
200 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
201 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
202 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
203 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
204 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
206 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
210 public void testOnRecoveryCompletedWithRemainingBatch() {
211 ReplicatedLog replicatedLog = context.getReplicatedLog();
212 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
213 0, new MockRaftActorContext.MockPayload("0")));
214 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
215 1, new MockRaftActorContext.MockPayload("1")));
217 sendMessageToSupport(new ApplyJournalEntries(1));
219 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
221 assertEquals("Last applied", 1, context.getLastApplied());
222 assertEquals("Commit index", 1, context.getCommitIndex());
224 InOrder inOrder = Mockito.inOrder(mockCohort);
225 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
227 for (int i = 0; i < replicatedLog.size(); i++) {
228 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
231 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
232 inOrder.verify(mockCohort).getRestoreFromSnapshot();
233 inOrder.verifyNoMoreInteractions();
237 public void testOnRecoveryCompletedWithNoRemainingBatch() {
238 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
240 verify(mockCohort).getRestoreFromSnapshot();
241 verifyNoMoreInteractions(mockCohort);
245 public void testOnDeleteEntries() {
246 ReplicatedLog replicatedLog = context.getReplicatedLog();
247 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
248 0, new MockRaftActorContext.MockPayload("0")));
249 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
250 1, new MockRaftActorContext.MockPayload("1")));
251 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
252 2, new MockRaftActorContext.MockPayload("2")));
254 sendMessageToSupport(new DeleteEntries(1));
256 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
257 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
261 public void testUpdateElectionTerm() {
263 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
265 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
266 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
270 public void testDataRecoveredWithPersistenceDisabled() {
271 doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
272 doReturn(false).when(mockPersistence).isRecoveryApplicable();
273 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
275 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
276 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
278 sendMessageToSupport(snapshotOffer);
280 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
282 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
283 4, new MockRaftActorContext.MockPayload("4")));
284 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
285 5, new MockRaftActorContext.MockPayload("5")));
287 sendMessageToSupport(new ApplyJournalEntries(4));
289 sendMessageToSupport(new DeleteEntries(5));
291 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
292 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
293 assertEquals("Last applied", -1, context.getLastApplied());
294 assertEquals("Commit index", -1, context.getCommitIndex());
295 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
296 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
298 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
299 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
301 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
303 verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
304 verify(mockCohort, never()).getRestoreFromSnapshot();
305 verifyNoMoreInteractions(mockCohort);
307 verify(mockPersistentProvider).deleteMessages(10L);
310 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
311 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
313 public boolean matches(Object argument) {
314 UpdateElectionTerm other = (UpdateElectionTerm) argument;
315 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
319 public void describeTo(Description description) {
320 description.appendValue(new UpdateElectionTerm(term, votedFor));
326 public void testNoDataRecoveredWithPersistenceDisabled() {
327 doReturn(false).when(mockPersistence).isRecoveryApplicable();
329 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
331 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
332 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
334 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
336 verify(mockCohort).getRestoreFromSnapshot();
337 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
341 public void testServerConfigurationPayloadApplied() {
342 String follower1 = "follower1";
343 String follower2 = "follower2";
344 String follower3 = "follower3";
346 context.addToPeers(follower1, null, VotingState.VOTING);
347 context.addToPeers(follower2, null, VotingState.VOTING);
350 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
351 new ServerInfo(localId, true),
352 new ServerInfo(follower1, true),
353 new ServerInfo(follower2, false),
354 new ServerInfo(follower3, true)));
356 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
359 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
360 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
361 Sets.newHashSet(context.getPeerIds()));
362 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
363 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
364 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
366 sendMessageToSupport(new ApplyJournalEntries(0));
368 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
369 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
371 //remove existing follower1
372 obj = new ServerConfigurationPayload(Arrays.asList(
373 new ServerInfo(localId, true),
374 new ServerInfo("follower2", true),
375 new ServerInfo("follower3", true)));
377 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
380 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
381 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
385 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
386 doReturn(false).when(mockPersistence).isRecoveryApplicable();
388 String follower = "follower";
389 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
390 new ServerInfo(localId, true), new ServerInfo(follower, true)));
392 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
395 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
399 public void testOnSnapshotOfferWithServerConfiguration() {
400 long electionTerm = 2;
401 String electionVotedFor = "member-2";
402 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
403 new ServerInfo(localId, true),
404 new ServerInfo("follower1", true),
405 new ServerInfo("follower2", true)));
407 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
408 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
410 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
411 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
413 sendMessageToSupport(snapshotOffer);
415 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
416 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
417 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
418 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
419 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
420 Sets.newHashSet(context.getPeerIds()));