2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.AdditionalMatchers.aryEq;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyInt;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.hamcrest.Description;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.ArgumentMatcher;
32 import org.mockito.InOrder;
33 import org.mockito.Matchers;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.PersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
41 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
42 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
43 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
45 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Unit tests for RaftActorRecoverySupport.
52 * @author Thomas Pantelis
54 public class RaftActorRecoverySupportTest {
56 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
59 private DataPersistenceProvider mockPersistence;
63 private RaftActorRecoveryCohort mockCohort;
66 PersistentDataProvider mockPersistentProvider;
68 private RaftActorRecoverySupport support;
70 private RaftActorContext context;
71 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
72 private final String localId = "leader";
77 MockitoAnnotations.initMocks(this);
79 context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
80 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
81 mockPersistence, applyState -> { }, LOG);
83 support = new RaftActorRecoverySupport(context, mockCohort);
85 doReturn(true).when(mockPersistence).isRecoveryApplicable();
87 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
90 private void sendMessageToSupport(Object message) {
91 sendMessageToSupport(message, false);
94 private void sendMessageToSupport(Object message, boolean expComplete) {
95 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
96 assertEquals("complete", expComplete, complete);
100 public void testOnReplicatedLogEntry() {
101 ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
103 sendMessageToSupport(logEntry);
105 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
106 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
107 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
108 assertEquals("Last applied", -1, context.getLastApplied());
109 assertEquals("Commit index", -1, context.getCommitIndex());
110 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
111 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
115 public void testOnApplyJournalEntries() {
116 configParams.setJournalRecoveryLogBatchSize(5);
118 ReplicatedLog replicatedLog = context.getReplicatedLog();
119 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
120 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
121 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
122 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
123 replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
124 replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
126 sendMessageToSupport(new ApplyJournalEntries(2));
128 assertEquals("Last applied", 2, context.getLastApplied());
129 assertEquals("Commit index", 2, context.getCommitIndex());
131 sendMessageToSupport(new ApplyJournalEntries(4));
133 assertEquals("Last applied", 4, context.getLastApplied());
134 assertEquals("Last applied", 4, context.getLastApplied());
136 sendMessageToSupport(new ApplyJournalEntries(5));
138 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
139 assertEquals("Last applied", 5, context.getLastApplied());
140 assertEquals("Commit index", 5, context.getCommitIndex());
141 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
142 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
144 InOrder inOrder = Mockito.inOrder(mockCohort);
145 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
147 for (int i = 0; i < replicatedLog.size() - 1; i++) {
148 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
151 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
152 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
153 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
155 inOrder.verifyNoMoreInteractions();
159 public void testOnSnapshotOffer() {
161 ReplicatedLog replicatedLog = context.getReplicatedLog();
162 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
163 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
164 replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
166 byte[] snapshotBytes = {1,2,3,4,5};
168 ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
169 new MockRaftActorContext.MockPayload("4", 4));
171 ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
172 new MockRaftActorContext.MockPayload("5", 5));
174 long lastAppliedDuringSnapshotCapture = 3;
175 long lastIndexDuringSnapshotCapture = 5;
176 long electionTerm = 2;
177 String electionVotedFor = "member-2";
179 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
180 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
182 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
183 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
185 sendMessageToSupport(snapshotOffer);
187 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
188 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
189 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
190 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
191 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
192 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
193 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
194 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
195 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
196 assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
198 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
202 public void testOnRecoveryCompletedWithRemainingBatch() {
203 ReplicatedLog replicatedLog = context.getReplicatedLog();
204 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
205 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
207 sendMessageToSupport(new ApplyJournalEntries(1));
209 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
211 assertEquals("Last applied", 1, context.getLastApplied());
212 assertEquals("Commit index", 1, context.getCommitIndex());
214 InOrder inOrder = Mockito.inOrder(mockCohort);
215 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
217 for (int i = 0; i < replicatedLog.size(); i++) {
218 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
221 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
222 inOrder.verify(mockCohort).getRestoreFromSnapshot();
223 inOrder.verifyNoMoreInteractions();
227 public void testOnRecoveryCompletedWithNoRemainingBatch() {
228 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
230 verify(mockCohort).getRestoreFromSnapshot();
231 verifyNoMoreInteractions(mockCohort);
235 public void testOnDeleteEntries() {
236 ReplicatedLog replicatedLog = context.getReplicatedLog();
237 replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
238 replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
239 replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
241 sendMessageToSupport(new DeleteEntries(1));
243 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
244 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
248 public void testUpdateElectionTerm() {
250 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
252 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
253 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
257 public void testDataRecoveredWithPersistenceDisabled() {
258 doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
259 doReturn(false).when(mockPersistence).isRecoveryApplicable();
260 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
262 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
263 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
265 sendMessageToSupport(snapshotOffer);
267 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
269 sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
270 sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
272 sendMessageToSupport(new ApplyJournalEntries(4));
274 sendMessageToSupport(new DeleteEntries(5));
276 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
277 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
278 assertEquals("Last applied", -1, context.getLastApplied());
279 assertEquals("Commit index", -1, context.getCommitIndex());
280 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
281 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
283 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
284 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
286 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
288 verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
289 verify(mockCohort, never()).getRestoreFromSnapshot();
290 verifyNoMoreInteractions(mockCohort);
292 verify(mockPersistentProvider).deleteMessages(10L);
295 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
296 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
298 public boolean matches(Object argument) {
299 UpdateElectionTerm other = (UpdateElectionTerm) argument;
300 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
304 public void describeTo(Description description) {
305 description.appendValue(new UpdateElectionTerm(term, votedFor));
311 public void testNoDataRecoveredWithPersistenceDisabled() {
312 doReturn(false).when(mockPersistence).isRecoveryApplicable();
314 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
316 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
317 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
319 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
321 verify(mockCohort).getRestoreFromSnapshot();
322 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
326 public void testServerConfigurationPayloadApplied() {
327 String follower1 = "follower1";
328 String follower2 = "follower2";
329 String follower3 = "follower3";
331 context.addToPeers(follower1, null, VotingState.VOTING);
332 context.addToPeers(follower2, null, VotingState.VOTING);
335 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
336 new ServerInfo(localId, true),
337 new ServerInfo(follower1, true),
338 new ServerInfo(follower2, false),
339 new ServerInfo(follower3, true)));
341 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
344 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
345 assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
346 Sets.newHashSet(context.getPeerIds()));
347 assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
348 assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
349 assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
351 sendMessageToSupport(new ApplyJournalEntries(0));
353 verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
354 verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
356 //remove existing follower1
357 obj = new ServerConfigurationPayload(Arrays.asList(
358 new ServerInfo(localId, true),
359 new ServerInfo("follower2", true),
360 new ServerInfo("follower3", true)));
362 sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
365 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
366 assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
370 public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
371 doReturn(false).when(mockPersistence).isRecoveryApplicable();
373 String follower = "follower";
374 ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
375 new ServerInfo(localId, true), new ServerInfo(follower, true)));
377 sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
380 assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
384 public void testOnSnapshotOfferWithServerConfiguration() {
385 long electionTerm = 2;
386 String electionVotedFor = "member-2";
387 ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
388 new ServerInfo(localId, true),
389 new ServerInfo("follower1", true),
390 new ServerInfo("follower2", true)));
392 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
393 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
395 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
396 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
398 sendMessageToSupport(snapshotOffer);
400 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
401 assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
402 assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
403 assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
404 assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
405 Sets.newHashSet(context.getPeerIds()));