Always persist and recover election term info
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyInt;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.verify;
15 import static org.mockito.Mockito.verifyNoMoreInteractions;
16 import akka.japi.Procedure;
17 import akka.persistence.RecoveryCompleted;
18 import akka.persistence.SnapshotMetadata;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import org.hamcrest.Description;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.ArgumentMatcher;
27 import org.mockito.InOrder;
28 import org.mockito.Matchers;
29 import org.mockito.Mock;
30 import org.mockito.Mockito;
31 import org.mockito.MockitoAnnotations;
32 import org.opendaylight.controller.cluster.DataPersistenceProvider;
33 import org.opendaylight.controller.cluster.PersistentDataProvider;
34 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
36 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
37 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
38 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Unit tests for RaftActorRecoverySupport.
44  *
45  * @author Thomas Pantelis
46  */
47 public class RaftActorRecoverySupportTest {
48
49     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
50
51     @Mock
52     private DataPersistenceProvider mockPersistence;
53
54     @Mock
55     private RaftActorBehavior mockBehavior;
56
57     @Mock
58     private RaftActorRecoveryCohort mockCohort;
59
60     @Mock
61     PersistentDataProvider mockPersistentProvider;
62
63     private RaftActorRecoverySupport support;
64
65     private RaftActorContext context;
66     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
67
68     @Before
69     public void setup() {
70         MockitoAnnotations.initMocks(this);
71
72         context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistentProvider, "test", LOG),
73                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
74
75         support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
76
77         doReturn(true).when(mockPersistence).isRecoveryApplicable();
78
79         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
80     }
81
82     private void sendMessageToSupport(Object message) {
83         sendMessageToSupport(message, false);
84     }
85
86     private void sendMessageToSupport(Object message, boolean expComplete) {
87         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
88         assertEquals("complete", expComplete, complete);
89     }
90
91     @Test
92     public void testOnReplicatedLogEntry() {
93         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
94                 1, new MockRaftActorContext.MockPayload("1", 5));
95
96         sendMessageToSupport(logEntry);
97
98         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
99         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
100         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
101         assertEquals("Last applied", -1, context.getLastApplied());
102         assertEquals("Commit index", -1, context.getCommitIndex());
103         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
104         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
105     }
106
107     @Test
108     public void testOnApplyJournalEntries() {
109         configParams.setJournalRecoveryLogBatchSize(5);
110
111         ReplicatedLog replicatedLog = context.getReplicatedLog();
112         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
113                 0, new MockRaftActorContext.MockPayload("0")));
114         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
115                 1, new MockRaftActorContext.MockPayload("1")));
116         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
117                 2, new MockRaftActorContext.MockPayload("2")));
118         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119                 3, new MockRaftActorContext.MockPayload("3")));
120         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121                 4, new MockRaftActorContext.MockPayload("4")));
122         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123                 5, new MockRaftActorContext.MockPayload("5")));
124
125         sendMessageToSupport(new ApplyJournalEntries(2));
126
127         assertEquals("Last applied", 2, context.getLastApplied());
128         assertEquals("Commit index", 2, context.getCommitIndex());
129
130         sendMessageToSupport(new ApplyJournalEntries(4));
131
132         assertEquals("Last applied", 4, context.getLastApplied());
133         assertEquals("Last applied", 4, context.getLastApplied());
134
135         sendMessageToSupport(new ApplyJournalEntries(5));
136
137         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
138         assertEquals("Last applied", 5, context.getLastApplied());
139         assertEquals("Commit index", 5, context.getCommitIndex());
140         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
141         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
142
143         InOrder inOrder = Mockito.inOrder(mockCohort);
144         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
145
146         for(int i = 0; i < replicatedLog.size() - 1; i++) {
147             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
148         }
149
150         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
151         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
152         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
153
154         inOrder.verifyNoMoreInteractions();
155     }
156
157     @Test
158     public void testOnApplyLogEntries() {
159         ReplicatedLog replicatedLog = context.getReplicatedLog();
160         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
161                 0, new MockRaftActorContext.MockPayload("0")));
162
163         sendMessageToSupport(new ApplyLogEntries(0));
164
165         assertEquals("Last applied", 0, context.getLastApplied());
166         assertEquals("Commit index", 0, context.getCommitIndex());
167     }
168
169     @Test
170     public void testOnSnapshotOffer() {
171
172         ReplicatedLog replicatedLog = context.getReplicatedLog();
173         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
174                 1, new MockRaftActorContext.MockPayload("1")));
175         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
176                 2, new MockRaftActorContext.MockPayload("2")));
177         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
178                 3, new MockRaftActorContext.MockPayload("3")));
179
180         byte[] snapshotBytes = {1,2,3,4,5};
181
182         ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
183                 4, new MockRaftActorContext.MockPayload("4", 4));
184
185         ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
186                 5, new MockRaftActorContext.MockPayload("5", 5));
187
188         int lastAppliedDuringSnapshotCapture = 3;
189         int lastIndexDuringSnapshotCapture = 5;
190
191         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
192                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
193
194         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
195         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
196
197         sendMessageToSupport(snapshotOffer);
198
199         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
200         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
201         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
202         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
203         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
204         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
205         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
206
207         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
208     }
209
210     @Test
211     public void testOnRecoveryCompletedWithRemainingBatch() {
212         ReplicatedLog replicatedLog = context.getReplicatedLog();
213         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
214                 0, new MockRaftActorContext.MockPayload("0")));
215         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
216                 1, new MockRaftActorContext.MockPayload("1")));
217
218         sendMessageToSupport(new ApplyJournalEntries(1));
219
220         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
221
222         assertEquals("Last applied", 1, context.getLastApplied());
223         assertEquals("Commit index", 1, context.getCommitIndex());
224
225         InOrder inOrder = Mockito.inOrder(mockCohort);
226         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
227
228         for(int i = 0; i < replicatedLog.size(); i++) {
229             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
230         }
231
232         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
233
234         inOrder.verifyNoMoreInteractions();
235     }
236
237     @Test
238     public void testOnRecoveryCompletedWithNoRemainingBatch() {
239         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
240
241         verifyNoMoreInteractions(mockCohort);
242     }
243
244     @Test
245     public void testOnDeprecatedDeleteEntries() {
246         ReplicatedLog replicatedLog = context.getReplicatedLog();
247         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
248                 0, new MockRaftActorContext.MockPayload("0")));
249         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
250                 1, new MockRaftActorContext.MockPayload("1")));
251         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
252                 2, new MockRaftActorContext.MockPayload("2")));
253
254         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
255
256         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
257         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
258     }
259
260     @Test
261     public void testOnDeleteEntries() {
262         ReplicatedLog replicatedLog = context.getReplicatedLog();
263         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
264                 0, new MockRaftActorContext.MockPayload("0")));
265         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
266                 1, new MockRaftActorContext.MockPayload("1")));
267         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
268                 2, new MockRaftActorContext.MockPayload("2")));
269
270         sendMessageToSupport(new DeleteEntries(1));
271
272         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
273         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
274     }
275
276     @Test
277     public void testUpdateElectionTerm() {
278
279         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
280
281         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
282         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
283     }
284
285     @Test
286     public void testDeprecatedUpdateElectionTerm() {
287
288         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
289
290         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
291         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
292     }
293
294     @SuppressWarnings("unchecked")
295     @Test
296     public void testDataRecoveredWithPersistenceDisabled() {
297         doReturn(false).when(mockPersistence).isRecoveryApplicable();
298         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
299
300         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
301
302         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
303         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
304
305         sendMessageToSupport(snapshotOffer);
306
307         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
308                 4, new MockRaftActorContext.MockPayload("4")));
309         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
310                 5, new MockRaftActorContext.MockPayload("5")));
311
312         sendMessageToSupport(new ApplyJournalEntries(4));
313
314         sendMessageToSupport(new DeleteEntries(5));
315
316         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
317
318         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
319         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
320         assertEquals("Last applied", -1, context.getLastApplied());
321         assertEquals("Commit index", -1, context.getCommitIndex());
322         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
323         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
324
325         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
326         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
327
328         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
329
330         verifyNoMoreInteractions(mockCohort);
331
332         verify(mockPersistentProvider).deleteMessages(10L);
333         verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
334         verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
335     }
336
337     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
338         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
339             @Override
340             public boolean matches(Object argument) {
341                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
342                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
343             }
344
345             @Override
346             public void describeTo(Description description) {
347                 description.appendValue(new UpdateElectionTerm(term, votedFor));
348             }
349         });
350     }
351
352     @Test
353     public void testNoDataRecoveredWithPersistenceDisabled() {
354         doReturn(false).when(mockPersistence).isRecoveryApplicable();
355
356         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
357
358         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
359         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
360
361         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
362
363         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
364     }
365 }