2c130314718d9ab092f5dde8546164c61ddd71b4
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import akka.japi.Procedure;
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.SnapshotMetadata;
23 import akka.persistence.SnapshotOffer;
24 import akka.persistence.SnapshotSelectionCriteria;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.hamcrest.Description;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.ArgumentMatcher;
32 import org.mockito.InOrder;
33 import org.mockito.Matchers;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.PersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
42 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
43 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
44 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
45 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Unit tests for RaftActorRecoverySupport.
51  *
52  * @author Thomas Pantelis
53  */
54 public class RaftActorRecoverySupportTest {
55
56     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
57
58     @Mock
59     private DataPersistenceProvider mockPersistence;
60
61     @Mock
62     private RaftActorBehavior mockBehavior;
63
64     @Mock
65     private RaftActorRecoveryCohort mockCohort;
66
67     @Mock
68     PersistentDataProvider mockPersistentProvider;
69
70     private RaftActorRecoverySupport support;
71
72     private RaftActorContext context;
73     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
74     private final String localId = "leader";
75
76
77     @Before
78     public void setup() {
79         MockitoAnnotations.initMocks(this);
80
81         doNothing().when(mockCohort).startLogRecoveryBatch(any(int.class));
82         doNothing().when(mockCohort).applyCurrentLogRecoveryBatch();
83         doNothing().when(mockCohort).applyRecoverySnapshot(any(byte[].class));
84         doNothing().when(mockCohort).appendRecoveredLogEntry(any(Payload.class));
85         doReturn(null).when(mockCohort).getRestoreFromSnapshot();
86         doReturn(true).when(mockPersistence).isRecoveryApplicable();
87         doNothing().when(mockPersistence).deleteMessages(any(long.class));
88         doNothing().when(mockPersistentProvider).deleteMessages(any(long.class));
89         doNothing().when(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
90         doNothing().when(mockPersistentProvider).persist(any(Object.class), any(Procedure.class));
91
92         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
93                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
94         support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
95         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
96     }
97
98     private void sendMessageToSupport(Object message) {
99         sendMessageToSupport(message, false);
100     }
101
102     private void sendMessageToSupport(Object message, boolean expComplete) {
103         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
104         assertEquals("complete", expComplete, complete);
105     }
106
107     @Test
108     public void testOnReplicatedLogEntry() {
109         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
110                 1, new MockRaftActorContext.MockPayload("1", 5));
111
112         sendMessageToSupport(logEntry);
113
114         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
115         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
116         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
117         assertEquals("Last applied", -1, context.getLastApplied());
118         assertEquals("Commit index", -1, context.getCommitIndex());
119         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
120         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
121     }
122
123     @Test
124     public void testOnApplyJournalEntries() {
125         configParams.setJournalRecoveryLogBatchSize(5);
126
127         ReplicatedLog replicatedLog = context.getReplicatedLog();
128         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
129                 0, new MockRaftActorContext.MockPayload("0")));
130         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
131                 1, new MockRaftActorContext.MockPayload("1")));
132         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
133                 2, new MockRaftActorContext.MockPayload("2")));
134         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
135                 3, new MockRaftActorContext.MockPayload("3")));
136         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
137                 4, new MockRaftActorContext.MockPayload("4")));
138         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
139                 5, new MockRaftActorContext.MockPayload("5")));
140
141         sendMessageToSupport(new ApplyJournalEntries(2));
142
143         assertEquals("Last applied", 2, context.getLastApplied());
144         assertEquals("Commit index", 2, context.getCommitIndex());
145
146         sendMessageToSupport(new ApplyJournalEntries(4));
147
148         assertEquals("Last applied", 4, context.getLastApplied());
149         assertEquals("Last applied", 4, context.getLastApplied());
150
151         sendMessageToSupport(new ApplyJournalEntries(5));
152
153         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
154         assertEquals("Last applied", 5, context.getLastApplied());
155         assertEquals("Commit index", 5, context.getCommitIndex());
156         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
157         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
158
159         InOrder inOrder = Mockito.inOrder(mockCohort);
160         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
161
162         for(int i = 0; i < replicatedLog.size() - 1; i++) {
163             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
164         }
165
166         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
167         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
168         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
169
170         inOrder.verifyNoMoreInteractions();
171     }
172
173     @Test
174     public void testOnApplyLogEntries() {
175         ReplicatedLog replicatedLog = context.getReplicatedLog();
176         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
177                 0, new MockRaftActorContext.MockPayload("0")));
178
179         sendMessageToSupport(new ApplyLogEntries(0));
180
181         assertEquals("Last applied", 0, context.getLastApplied());
182         assertEquals("Commit index", 0, context.getCommitIndex());
183     }
184
185     @Test
186     public void testOnSnapshotOffer() {
187
188         ReplicatedLog replicatedLog = context.getReplicatedLog();
189         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
190                 1, new MockRaftActorContext.MockPayload("1")));
191         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
192                 2, new MockRaftActorContext.MockPayload("2")));
193         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
194                 3, new MockRaftActorContext.MockPayload("3")));
195
196         byte[] snapshotBytes = {1,2,3,4,5};
197
198         ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
199                 4, new MockRaftActorContext.MockPayload("4", 4));
200
201         ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
202                 5, new MockRaftActorContext.MockPayload("5", 5));
203
204         long lastAppliedDuringSnapshotCapture = 3;
205         long lastIndexDuringSnapshotCapture = 5;
206         long electionTerm = 2;
207         String electionVotedFor = "member-2";
208
209         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
210                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
211
212         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
213         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
214
215         sendMessageToSupport(snapshotOffer);
216
217         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
218         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
219         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
220         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
221         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
222         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
223         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
224         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
225         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
226         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
227
228         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
229     }
230
231     @Test
232     public void testOnRecoveryCompletedWithRemainingBatch() {
233         ReplicatedLog replicatedLog = context.getReplicatedLog();
234         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
235                 0, new MockRaftActorContext.MockPayload("0")));
236         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
237                 1, new MockRaftActorContext.MockPayload("1")));
238
239         sendMessageToSupport(new ApplyJournalEntries(1));
240
241         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
242
243         assertEquals("Last applied", 1, context.getLastApplied());
244         assertEquals("Commit index", 1, context.getCommitIndex());
245
246         InOrder inOrder = Mockito.inOrder(mockCohort);
247         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
248
249         for(int i = 0; i < replicatedLog.size(); i++) {
250             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
251         }
252
253         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
254         inOrder.verify(mockCohort).getRestoreFromSnapshot();
255         inOrder.verifyNoMoreInteractions();
256     }
257
258     @Test
259     public void testOnRecoveryCompletedWithNoRemainingBatch() {
260         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
261
262         verify(mockCohort).getRestoreFromSnapshot();
263         verifyNoMoreInteractions(mockCohort);
264     }
265
266     @Test
267     public void testOnDeprecatedDeleteEntries() {
268         ReplicatedLog replicatedLog = context.getReplicatedLog();
269         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
270                 0, new MockRaftActorContext.MockPayload("0")));
271         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
272                 1, new MockRaftActorContext.MockPayload("1")));
273         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
274                 2, new MockRaftActorContext.MockPayload("2")));
275
276         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
277
278         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
279         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
280     }
281
282     @Test
283     public void testOnDeleteEntries() {
284         ReplicatedLog replicatedLog = context.getReplicatedLog();
285         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
286                 0, new MockRaftActorContext.MockPayload("0")));
287         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
288                 1, new MockRaftActorContext.MockPayload("1")));
289         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
290                 2, new MockRaftActorContext.MockPayload("2")));
291
292         sendMessageToSupport(new DeleteEntries(1));
293
294         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
295         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
296     }
297
298     @Test
299     public void testUpdateElectionTerm() {
300
301         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
302
303         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
304         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
305     }
306
307     @Test
308     public void testDeprecatedUpdateElectionTerm() {
309
310         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
311
312         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
313         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
314     }
315
316     @SuppressWarnings("unchecked")
317     @Test
318     public void testDataRecoveredWithPersistenceDisabled() {
319         doReturn(false).when(mockPersistence).isRecoveryApplicable();
320         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
321
322         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
323
324         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
325         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
326
327         sendMessageToSupport(snapshotOffer);
328
329         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
330                 4, new MockRaftActorContext.MockPayload("4")));
331         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
332                 5, new MockRaftActorContext.MockPayload("5")));
333
334         sendMessageToSupport(new ApplyJournalEntries(4));
335
336         sendMessageToSupport(new DeleteEntries(5));
337
338         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
339
340         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
341         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
342         assertEquals("Last applied", -1, context.getLastApplied());
343         assertEquals("Commit index", -1, context.getCommitIndex());
344         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
345         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
346
347         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
348         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
349
350         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
351
352         verify(mockCohort).getRestoreFromSnapshot();
353         verifyNoMoreInteractions(mockCohort);
354
355         verify(mockPersistentProvider).deleteMessages(10L);
356         verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
357         verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
358     }
359
360     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
361         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
362             @Override
363             public boolean matches(Object argument) {
364                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
365                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
366             }
367
368             @Override
369             public void describeTo(Description description) {
370                 description.appendValue(new UpdateElectionTerm(term, votedFor));
371             }
372         });
373     }
374
375     @Test
376     public void testNoDataRecoveredWithPersistenceDisabled() {
377         doReturn(false).when(mockPersistence).isRecoveryApplicable();
378
379         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
380
381         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
382         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
383
384         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
385
386         verify(mockCohort).getRestoreFromSnapshot();
387         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
388     }
389
390     @Test
391     public void testServerConfigurationPayloadApplied() {
392         String follower1 = "follower1";
393         String follower2 = "follower2";
394         String follower3 = "follower3";
395
396         context.addToPeers(follower1, null, VotingState.VOTING);
397         context.addToPeers(follower2, null, VotingState.VOTING);
398
399         //add new Server
400         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
401                 new ServerInfo(localId, true),
402                 new ServerInfo(follower1, true),
403                 new ServerInfo(follower2, false),
404                 new ServerInfo(follower3, true)));
405
406         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
407
408         //verify new peers
409         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
410         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
411                 Sets.newHashSet(context.getPeerIds()));
412         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
413         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
414         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
415
416         sendMessageToSupport(new ApplyJournalEntries(0));
417
418         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
419         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
420
421         //remove existing follower1
422         obj = new ServerConfigurationPayload(Arrays.asList(
423                 new ServerInfo(localId, true),
424                 new ServerInfo("follower2", true),
425                 new ServerInfo("follower3", true)));
426
427         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
428
429         //verify new peers
430         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
431         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
432     }
433
434     @Test
435     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
436         doReturn(false).when(mockPersistence).isRecoveryApplicable();
437
438         String follower = "follower";
439         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
440                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
441
442         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
443
444         //verify new peers
445         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
446     }
447
448     @Test
449     public void testOnSnapshotOfferWithServerConfiguration() {
450         long electionTerm = 2;
451         String electionVotedFor = "member-2";
452         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
453                                                         new ServerInfo(localId, true),
454                                                         new ServerInfo("follower1", true),
455                                                         new ServerInfo("follower2", true)));
456
457         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
458                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
459
460         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
461         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
462
463         sendMessageToSupport(snapshotOffer);
464
465         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
466         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
467         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
468         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
469         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
470             Sets.newHashSet(context.getPeerIds()));
471     }
472 }