2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyInt;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.verify;
15 import static org.mockito.Mockito.verifyNoMoreInteractions;
16 import akka.japi.Procedure;
17 import akka.persistence.RecoveryCompleted;
18 import akka.persistence.SnapshotMetadata;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import org.hamcrest.Description;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.ArgumentMatcher;
27 import org.mockito.InOrder;
28 import org.mockito.Matchers;
29 import org.mockito.Mock;
30 import org.mockito.Mockito;
31 import org.mockito.MockitoAnnotations;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.PersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
36 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
37 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
38 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Unit tests for RaftActorRecoverySupport.
45 * @author Thomas Pantelis
47 public class RaftActorRecoverySupportTest {
49 private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
52 private DataPersistenceProvider mockPersistence;
55 private RaftActorBehavior mockBehavior;
58 private RaftActorRecoveryCohort mockCohort;
61 PersistentDataProvider mockPersistentProvider;
63 private RaftActorRecoverySupport support;
65 private RaftActorContext context;
66 private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
70 MockitoAnnotations.initMocks(this);
72 context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistentProvider, "test", LOG),
73 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
75 support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
77 doReturn(true).when(mockPersistence).isRecoveryApplicable();
79 context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
82 private void sendMessageToSupport(Object message) {
83 sendMessageToSupport(message, false);
86 private void sendMessageToSupport(Object message, boolean expComplete) {
87 boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
88 assertEquals("complete", expComplete, complete);
92 public void testOnReplicatedLogEntry() {
93 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
94 1, new MockRaftActorContext.MockPayload("1", 5));
96 sendMessageToSupport(logEntry);
98 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
99 assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
100 assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
101 assertEquals("Last applied", -1, context.getLastApplied());
102 assertEquals("Commit index", -1, context.getCommitIndex());
103 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
104 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
108 public void testOnApplyJournalEntries() {
109 configParams.setJournalRecoveryLogBatchSize(5);
111 ReplicatedLog replicatedLog = context.getReplicatedLog();
112 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
113 0, new MockRaftActorContext.MockPayload("0")));
114 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
115 1, new MockRaftActorContext.MockPayload("1")));
116 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
117 2, new MockRaftActorContext.MockPayload("2")));
118 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119 3, new MockRaftActorContext.MockPayload("3")));
120 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121 4, new MockRaftActorContext.MockPayload("4")));
122 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123 5, new MockRaftActorContext.MockPayload("5")));
125 sendMessageToSupport(new ApplyJournalEntries(2));
127 assertEquals("Last applied", 2, context.getLastApplied());
128 assertEquals("Commit index", 2, context.getCommitIndex());
130 sendMessageToSupport(new ApplyJournalEntries(4));
132 assertEquals("Last applied", 4, context.getLastApplied());
133 assertEquals("Last applied", 4, context.getLastApplied());
135 sendMessageToSupport(new ApplyJournalEntries(5));
137 assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
138 assertEquals("Last applied", 5, context.getLastApplied());
139 assertEquals("Commit index", 5, context.getCommitIndex());
140 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
141 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
143 InOrder inOrder = Mockito.inOrder(mockCohort);
144 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
146 for(int i = 0; i < replicatedLog.size() - 1; i++) {
147 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
150 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
151 inOrder.verify(mockCohort).startLogRecoveryBatch(5);
152 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
154 inOrder.verifyNoMoreInteractions();
158 public void testOnApplyLogEntries() {
159 ReplicatedLog replicatedLog = context.getReplicatedLog();
160 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
161 0, new MockRaftActorContext.MockPayload("0")));
163 sendMessageToSupport(new ApplyLogEntries(0));
165 assertEquals("Last applied", 0, context.getLastApplied());
166 assertEquals("Commit index", 0, context.getCommitIndex());
170 public void testOnSnapshotOffer() {
172 ReplicatedLog replicatedLog = context.getReplicatedLog();
173 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
174 1, new MockRaftActorContext.MockPayload("1")));
175 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
176 2, new MockRaftActorContext.MockPayload("2")));
177 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
178 3, new MockRaftActorContext.MockPayload("3")));
180 byte[] snapshotBytes = {1,2,3,4,5};
182 ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
183 4, new MockRaftActorContext.MockPayload("4", 4));
185 ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
186 5, new MockRaftActorContext.MockPayload("5", 5));
188 int lastAppliedDuringSnapshotCapture = 3;
189 int lastIndexDuringSnapshotCapture = 5;
191 Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
192 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
194 SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
195 SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
197 sendMessageToSupport(snapshotOffer);
199 assertEquals("Journal log size", 2, context.getReplicatedLog().size());
200 assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
201 assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
202 assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
203 assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
204 assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
205 assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
207 verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
211 public void testOnRecoveryCompletedWithRemainingBatch() {
212 ReplicatedLog replicatedLog = context.getReplicatedLog();
213 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
214 0, new MockRaftActorContext.MockPayload("0")));
215 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
216 1, new MockRaftActorContext.MockPayload("1")));
218 sendMessageToSupport(new ApplyJournalEntries(1));
220 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
222 assertEquals("Last applied", 1, context.getLastApplied());
223 assertEquals("Commit index", 1, context.getCommitIndex());
225 InOrder inOrder = Mockito.inOrder(mockCohort);
226 inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
228 for(int i = 0; i < replicatedLog.size(); i++) {
229 inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
232 inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
234 inOrder.verifyNoMoreInteractions();
238 public void testOnRecoveryCompletedWithNoRemainingBatch() {
239 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
241 verifyNoMoreInteractions(mockCohort);
245 public void testOnDeprecatedDeleteEntries() {
246 ReplicatedLog replicatedLog = context.getReplicatedLog();
247 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
248 0, new MockRaftActorContext.MockPayload("0")));
249 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
250 1, new MockRaftActorContext.MockPayload("1")));
251 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
252 2, new MockRaftActorContext.MockPayload("2")));
254 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
256 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
257 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
261 public void testOnDeleteEntries() {
262 ReplicatedLog replicatedLog = context.getReplicatedLog();
263 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
264 0, new MockRaftActorContext.MockPayload("0")));
265 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
266 1, new MockRaftActorContext.MockPayload("1")));
267 replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
268 2, new MockRaftActorContext.MockPayload("2")));
270 sendMessageToSupport(new DeleteEntries(1));
272 assertEquals("Journal log size", 1, context.getReplicatedLog().size());
273 assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
277 public void testUpdateElectionTerm() {
279 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
281 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
282 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
286 public void testDeprecatedUpdateElectionTerm() {
288 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
290 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
291 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
294 @SuppressWarnings("unchecked")
296 public void testDataRecoveredWithPersistenceDisabled() {
297 doReturn(false).when(mockPersistence).isRecoveryApplicable();
298 doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
300 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
302 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
303 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
305 sendMessageToSupport(snapshotOffer);
307 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
308 4, new MockRaftActorContext.MockPayload("4")));
309 sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
310 5, new MockRaftActorContext.MockPayload("5")));
312 sendMessageToSupport(new ApplyJournalEntries(4));
314 sendMessageToSupport(new DeleteEntries(5));
316 sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
318 assertEquals("Journal log size", 0, context.getReplicatedLog().size());
319 assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
320 assertEquals("Last applied", -1, context.getLastApplied());
321 assertEquals("Commit index", -1, context.getCommitIndex());
322 assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
323 assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
325 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
326 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
328 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
330 verifyNoMoreInteractions(mockCohort);
332 verify(mockPersistentProvider).deleteMessages(10L);
333 verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
334 verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
337 static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
338 return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
340 public boolean matches(Object argument) {
341 UpdateElectionTerm other = (UpdateElectionTerm) argument;
342 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
346 public void describeTo(Description description) {
347 description.appendValue(new UpdateElectionTerm(term, votedFor));
353 public void testNoDataRecoveredWithPersistenceDisabled() {
354 doReturn(false).when(mockPersistence).isRecoveryApplicable();
356 sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
358 assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
359 assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
361 sendMessageToSupport(RecoveryCompleted.getInstance(), true);
363 verifyNoMoreInteractions(mockCohort, mockPersistentProvider);