da02e81fa82b3cd67c39785fe5669ca6d7a054c0
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyInt;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.verify;
16 import static org.mockito.Mockito.verifyNoMoreInteractions;
17 import akka.japi.Procedure;
18 import akka.persistence.RecoveryCompleted;
19 import akka.persistence.SnapshotMetadata;
20 import akka.persistence.SnapshotOffer;
21 import akka.persistence.SnapshotSelectionCriteria;
22 import com.google.common.collect.Sets;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import org.hamcrest.Description;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.ArgumentMatcher;
29 import org.mockito.InOrder;
30 import org.mockito.Matchers;
31 import org.mockito.Mock;
32 import org.mockito.Mockito;
33 import org.mockito.MockitoAnnotations;
34 import org.opendaylight.controller.cluster.DataPersistenceProvider;
35 import org.opendaylight.controller.cluster.PersistentDataProvider;
36 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
41 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
42 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Unit tests for RaftActorRecoverySupport.
48  *
49  * @author Thomas Pantelis
50  */
51 public class RaftActorRecoverySupportTest {
52
53     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
54
55     @Mock
56     private DataPersistenceProvider mockPersistence;
57
58     @Mock
59     private RaftActorBehavior mockBehavior;
60
61     @Mock
62     private RaftActorRecoveryCohort mockCohort;
63
64     @Mock
65     PersistentDataProvider mockPersistentProvider;
66
67     private RaftActorRecoverySupport support;
68
69     private RaftActorContext context;
70     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
71     private final String localId = "leader";
72
73
74     @Before
75     public void setup() {
76         MockitoAnnotations.initMocks(this);
77
78         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
79                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
80
81         support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
82
83         doReturn(true).when(mockPersistence).isRecoveryApplicable();
84
85         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
86     }
87
88     private void sendMessageToSupport(Object message) {
89         sendMessageToSupport(message, false);
90     }
91
92     private void sendMessageToSupport(Object message, boolean expComplete) {
93         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
94         assertEquals("complete", expComplete, complete);
95     }
96
97     @Test
98     public void testOnReplicatedLogEntry() {
99         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
100                 1, new MockRaftActorContext.MockPayload("1", 5));
101
102         sendMessageToSupport(logEntry);
103
104         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
105         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
106         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
107         assertEquals("Last applied", -1, context.getLastApplied());
108         assertEquals("Commit index", -1, context.getCommitIndex());
109         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
110         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
111     }
112
113     @Test
114     public void testOnApplyJournalEntries() {
115         configParams.setJournalRecoveryLogBatchSize(5);
116
117         ReplicatedLog replicatedLog = context.getReplicatedLog();
118         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119                 0, new MockRaftActorContext.MockPayload("0")));
120         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121                 1, new MockRaftActorContext.MockPayload("1")));
122         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123                 2, new MockRaftActorContext.MockPayload("2")));
124         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125                 3, new MockRaftActorContext.MockPayload("3")));
126         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127                 4, new MockRaftActorContext.MockPayload("4")));
128         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
129                 5, new MockRaftActorContext.MockPayload("5")));
130
131         sendMessageToSupport(new ApplyJournalEntries(2));
132
133         assertEquals("Last applied", 2, context.getLastApplied());
134         assertEquals("Commit index", 2, context.getCommitIndex());
135
136         sendMessageToSupport(new ApplyJournalEntries(4));
137
138         assertEquals("Last applied", 4, context.getLastApplied());
139         assertEquals("Last applied", 4, context.getLastApplied());
140
141         sendMessageToSupport(new ApplyJournalEntries(5));
142
143         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
144         assertEquals("Last applied", 5, context.getLastApplied());
145         assertEquals("Commit index", 5, context.getCommitIndex());
146         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
147         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
148
149         InOrder inOrder = Mockito.inOrder(mockCohort);
150         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
151
152         for(int i = 0; i < replicatedLog.size() - 1; i++) {
153             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
154         }
155
156         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
157         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
158         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
159
160         inOrder.verifyNoMoreInteractions();
161     }
162
163     @Test
164     public void testOnApplyLogEntries() {
165         ReplicatedLog replicatedLog = context.getReplicatedLog();
166         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
167                 0, new MockRaftActorContext.MockPayload("0")));
168
169         sendMessageToSupport(new ApplyLogEntries(0));
170
171         assertEquals("Last applied", 0, context.getLastApplied());
172         assertEquals("Commit index", 0, context.getCommitIndex());
173     }
174
175     @Test
176     public void testOnSnapshotOffer() {
177
178         ReplicatedLog replicatedLog = context.getReplicatedLog();
179         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
180                 1, new MockRaftActorContext.MockPayload("1")));
181         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
182                 2, new MockRaftActorContext.MockPayload("2")));
183         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
184                 3, new MockRaftActorContext.MockPayload("3")));
185
186         byte[] snapshotBytes = {1,2,3,4,5};
187
188         ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
189                 4, new MockRaftActorContext.MockPayload("4", 4));
190
191         ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
192                 5, new MockRaftActorContext.MockPayload("5", 5));
193
194         long lastAppliedDuringSnapshotCapture = 3;
195         long lastIndexDuringSnapshotCapture = 5;
196         long electionTerm = 2;
197         String electionVotedFor = "member-2";
198
199         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
200                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
201
202         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
203         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
204
205         sendMessageToSupport(snapshotOffer);
206
207         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
208         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
209         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
210         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
211         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
212         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
213         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
214         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
215         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
216
217         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
218     }
219
220     @Test
221     public void testOnRecoveryCompletedWithRemainingBatch() {
222         ReplicatedLog replicatedLog = context.getReplicatedLog();
223         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
224                 0, new MockRaftActorContext.MockPayload("0")));
225         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
226                 1, new MockRaftActorContext.MockPayload("1")));
227
228         sendMessageToSupport(new ApplyJournalEntries(1));
229
230         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
231
232         assertEquals("Last applied", 1, context.getLastApplied());
233         assertEquals("Commit index", 1, context.getCommitIndex());
234
235         InOrder inOrder = Mockito.inOrder(mockCohort);
236         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
237
238         for(int i = 0; i < replicatedLog.size(); i++) {
239             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
240         }
241
242         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
243
244         inOrder.verifyNoMoreInteractions();
245     }
246
247     @Test
248     public void testOnRecoveryCompletedWithNoRemainingBatch() {
249         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
250
251         verifyNoMoreInteractions(mockCohort);
252     }
253
254     @Test
255     public void testOnDeprecatedDeleteEntries() {
256         ReplicatedLog replicatedLog = context.getReplicatedLog();
257         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
258                 0, new MockRaftActorContext.MockPayload("0")));
259         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
260                 1, new MockRaftActorContext.MockPayload("1")));
261         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
262                 2, new MockRaftActorContext.MockPayload("2")));
263
264         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
265
266         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
267         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
268     }
269
270     @Test
271     public void testOnDeleteEntries() {
272         ReplicatedLog replicatedLog = context.getReplicatedLog();
273         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
274                 0, new MockRaftActorContext.MockPayload("0")));
275         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
276                 1, new MockRaftActorContext.MockPayload("1")));
277         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
278                 2, new MockRaftActorContext.MockPayload("2")));
279
280         sendMessageToSupport(new DeleteEntries(1));
281
282         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
283         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
284     }
285
286     @Test
287     public void testUpdateElectionTerm() {
288
289         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
290
291         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
292         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
293     }
294
295     @Test
296     public void testDeprecatedUpdateElectionTerm() {
297
298         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
299
300         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
301         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
302     }
303
304     @SuppressWarnings("unchecked")
305     @Test
306     public void testDataRecoveredWithPersistenceDisabled() {
307         doReturn(false).when(mockPersistence).isRecoveryApplicable();
308         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
309
310         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
311
312         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
313         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
314
315         sendMessageToSupport(snapshotOffer);
316
317         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
318                 4, new MockRaftActorContext.MockPayload("4")));
319         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
320                 5, new MockRaftActorContext.MockPayload("5")));
321
322         sendMessageToSupport(new ApplyJournalEntries(4));
323
324         sendMessageToSupport(new DeleteEntries(5));
325
326         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
327
328         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
329         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
330         assertEquals("Last applied", -1, context.getLastApplied());
331         assertEquals("Commit index", -1, context.getCommitIndex());
332         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
333         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
334
335         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
336         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
337
338         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
339
340         verifyNoMoreInteractions(mockCohort);
341
342         verify(mockPersistentProvider).deleteMessages(10L);
343         verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
344         verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
345     }
346
347     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
348         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
349             @Override
350             public boolean matches(Object argument) {
351                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
352                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
353             }
354
355             @Override
356             public void describeTo(Description description) {
357                 description.appendValue(new UpdateElectionTerm(term, votedFor));
358             }
359         });
360     }
361
362     @Test
363     public void testNoDataRecoveredWithPersistenceDisabled() {
364         doReturn(false).when(mockPersistence).isRecoveryApplicable();
365
366         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
367
368         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
369         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
370
371         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
372
373         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
374     }
375
376     @Test
377     public void testServerConfigurationPayloadApplied() {
378         String follower1 = "follower1";
379         String follower2 = "follower2";
380         String follower3 = "follower3";
381
382         context.addToPeers(follower1, null, VotingState.VOTING);
383         context.addToPeers(follower2, null, VotingState.VOTING);
384
385         //add new Server
386         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
387                 new ServerInfo(localId, true),
388                 new ServerInfo(follower1, true),
389                 new ServerInfo(follower2, false),
390                 new ServerInfo(follower3, true)));
391
392         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
393
394         //verify new peers
395         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
396                 Sets.newHashSet(context.getPeerIds()));
397         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
398         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
399         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
400
401         sendMessageToSupport(new ApplyJournalEntries(0));
402
403         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
404         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
405
406         //remove existing follower1
407         obj = new ServerConfigurationPayload(Arrays.asList(
408                 new ServerInfo(localId, true),
409                 new ServerInfo("follower2", true),
410                 new ServerInfo("follower3", true)));
411
412         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
413
414         //verify new peers
415         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
416     }
417
418     @Test
419     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
420         doReturn(false).when(mockPersistence).isRecoveryApplicable();
421
422         String follower = "follower";
423         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
424                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
425
426         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
427
428         //verify new peers
429         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
430     }
431 }