af19b49dbe974252d21e14007283979a616cd897
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.verifyNoMoreInteractions;
19 import akka.japi.Procedure;
20 import akka.persistence.RecoveryCompleted;
21 import akka.persistence.SnapshotMetadata;
22 import akka.persistence.SnapshotOffer;
23 import akka.persistence.SnapshotSelectionCriteria;
24 import com.google.common.collect.Sets;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import org.hamcrest.Description;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatcher;
31 import org.mockito.InOrder;
32 import org.mockito.Matchers;
33 import org.mockito.Mock;
34 import org.mockito.Mockito;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.PersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
41 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
42 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
43 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Unit tests for RaftActorRecoverySupport.
49  *
50  * @author Thomas Pantelis
51  */
52 public class RaftActorRecoverySupportTest {
53
54     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
55
56     @Mock
57     private DataPersistenceProvider mockPersistence;
58
59
60     @Mock
61     private RaftActorRecoveryCohort mockCohort;
62
63     @Mock
64     PersistentDataProvider mockPersistentProvider;
65
66     private RaftActorRecoverySupport support;
67
68     private RaftActorContext context;
69     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
70     private final String localId = "leader";
71
72
73     @Before
74     public void setup() {
75         MockitoAnnotations.initMocks(this);
76
77         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
78                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
79
80         support = new RaftActorRecoverySupport(context, mockCohort);
81
82         doReturn(true).when(mockPersistence).isRecoveryApplicable();
83
84         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
85     }
86
87     private void sendMessageToSupport(Object message) {
88         sendMessageToSupport(message, false);
89     }
90
91     private void sendMessageToSupport(Object message, boolean expComplete) {
92         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
93         assertEquals("complete", expComplete, complete);
94     }
95
96     @Test
97     public void testOnReplicatedLogEntry() {
98         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
99                 1, new MockRaftActorContext.MockPayload("1", 5));
100
101         sendMessageToSupport(logEntry);
102
103         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
104         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
105         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
106         assertEquals("Last applied", -1, context.getLastApplied());
107         assertEquals("Commit index", -1, context.getCommitIndex());
108         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
109         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
110     }
111
112     @Test
113     public void testOnApplyJournalEntries() {
114         configParams.setJournalRecoveryLogBatchSize(5);
115
116         ReplicatedLog replicatedLog = context.getReplicatedLog();
117         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
118                 0, new MockRaftActorContext.MockPayload("0")));
119         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
120                 1, new MockRaftActorContext.MockPayload("1")));
121         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
122                 2, new MockRaftActorContext.MockPayload("2")));
123         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
124                 3, new MockRaftActorContext.MockPayload("3")));
125         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
126                 4, new MockRaftActorContext.MockPayload("4")));
127         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
128                 5, new MockRaftActorContext.MockPayload("5")));
129
130         sendMessageToSupport(new ApplyJournalEntries(2));
131
132         assertEquals("Last applied", 2, context.getLastApplied());
133         assertEquals("Commit index", 2, context.getCommitIndex());
134
135         sendMessageToSupport(new ApplyJournalEntries(4));
136
137         assertEquals("Last applied", 4, context.getLastApplied());
138         assertEquals("Last applied", 4, context.getLastApplied());
139
140         sendMessageToSupport(new ApplyJournalEntries(5));
141
142         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
143         assertEquals("Last applied", 5, context.getLastApplied());
144         assertEquals("Commit index", 5, context.getCommitIndex());
145         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
146         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
147
148         InOrder inOrder = Mockito.inOrder(mockCohort);
149         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
150
151         for(int i = 0; i < replicatedLog.size() - 1; i++) {
152             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
153         }
154
155         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
156         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
157         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
158
159         inOrder.verifyNoMoreInteractions();
160     }
161
162     @Test
163     public void testOnSnapshotOffer() {
164
165         ReplicatedLog replicatedLog = context.getReplicatedLog();
166         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
167                 1, new MockRaftActorContext.MockPayload("1")));
168         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
169                 2, new MockRaftActorContext.MockPayload("2")));
170         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
171                 3, new MockRaftActorContext.MockPayload("3")));
172
173         byte[] snapshotBytes = {1,2,3,4,5};
174
175         ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
176                 4, new MockRaftActorContext.MockPayload("4", 4));
177
178         ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
179                 5, new MockRaftActorContext.MockPayload("5", 5));
180
181         long lastAppliedDuringSnapshotCapture = 3;
182         long lastIndexDuringSnapshotCapture = 5;
183         long electionTerm = 2;
184         String electionVotedFor = "member-2";
185
186         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
187                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
188
189         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
190         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
191
192         sendMessageToSupport(snapshotOffer);
193
194         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
195         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
196         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
197         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
198         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
199         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
200         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
201         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
202         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
203         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
204
205         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
206     }
207
208     @Test
209     public void testOnRecoveryCompletedWithRemainingBatch() {
210         ReplicatedLog replicatedLog = context.getReplicatedLog();
211         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
212                 0, new MockRaftActorContext.MockPayload("0")));
213         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
214                 1, new MockRaftActorContext.MockPayload("1")));
215
216         sendMessageToSupport(new ApplyJournalEntries(1));
217
218         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
219
220         assertEquals("Last applied", 1, context.getLastApplied());
221         assertEquals("Commit index", 1, context.getCommitIndex());
222
223         InOrder inOrder = Mockito.inOrder(mockCohort);
224         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
225
226         for(int i = 0; i < replicatedLog.size(); i++) {
227             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
228         }
229
230         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
231         inOrder.verify(mockCohort).getRestoreFromSnapshot();
232         inOrder.verifyNoMoreInteractions();
233     }
234
235     @Test
236     public void testOnRecoveryCompletedWithNoRemainingBatch() {
237         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
238
239         verify(mockCohort).getRestoreFromSnapshot();
240         verifyNoMoreInteractions(mockCohort);
241     }
242
243     @Test
244     public void testOnDeprecatedDeleteEntries() {
245         ReplicatedLog replicatedLog = context.getReplicatedLog();
246         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
247                 0, new MockRaftActorContext.MockPayload("0")));
248         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
249                 1, new MockRaftActorContext.MockPayload("1")));
250         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
251                 2, new MockRaftActorContext.MockPayload("2")));
252
253         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
254
255         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
256         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
257     }
258
259     @Test
260     public void testOnDeleteEntries() {
261         ReplicatedLog replicatedLog = context.getReplicatedLog();
262         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
263                 0, new MockRaftActorContext.MockPayload("0")));
264         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
265                 1, new MockRaftActorContext.MockPayload("1")));
266         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
267                 2, new MockRaftActorContext.MockPayload("2")));
268
269         sendMessageToSupport(new DeleteEntries(1));
270
271         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
272         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
273     }
274
275     @Test
276     public void testUpdateElectionTerm() {
277
278         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
279
280         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
281         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
282     }
283
284     @Test
285     public void testDeprecatedUpdateElectionTerm() {
286
287         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
288
289         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
290         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
291     }
292
293     @SuppressWarnings("unchecked")
294     @Test
295     public void testDataRecoveredWithPersistenceDisabled() {
296         doReturn(false).when(mockPersistence).isRecoveryApplicable();
297         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
298
299         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
300
301         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
302         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
303
304         sendMessageToSupport(snapshotOffer);
305
306         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
307                 4, new MockRaftActorContext.MockPayload("4")));
308         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
309                 5, new MockRaftActorContext.MockPayload("5")));
310
311         sendMessageToSupport(new ApplyJournalEntries(4));
312
313         sendMessageToSupport(new DeleteEntries(5));
314
315         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
316
317         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
318         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
319         assertEquals("Last applied", -1, context.getLastApplied());
320         assertEquals("Commit index", -1, context.getCommitIndex());
321         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
322         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
323
324         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
325         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
326
327         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
328
329         verify(mockCohort).getRestoreFromSnapshot();
330         verifyNoMoreInteractions(mockCohort);
331
332         verify(mockPersistentProvider).deleteMessages(10L);
333         verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
334         verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
335     }
336
337     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
338         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
339             @Override
340             public boolean matches(Object argument) {
341                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
342                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
343             }
344
345             @Override
346             public void describeTo(Description description) {
347                 description.appendValue(new UpdateElectionTerm(term, votedFor));
348             }
349         });
350     }
351
352     @Test
353     public void testNoDataRecoveredWithPersistenceDisabled() {
354         doReturn(false).when(mockPersistence).isRecoveryApplicable();
355
356         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
357
358         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
359         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
360
361         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
362
363         verify(mockCohort).getRestoreFromSnapshot();
364         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
365     }
366
367     @Test
368     public void testServerConfigurationPayloadApplied() {
369         String follower1 = "follower1";
370         String follower2 = "follower2";
371         String follower3 = "follower3";
372
373         context.addToPeers(follower1, null, VotingState.VOTING);
374         context.addToPeers(follower2, null, VotingState.VOTING);
375
376         //add new Server
377         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
378                 new ServerInfo(localId, true),
379                 new ServerInfo(follower1, true),
380                 new ServerInfo(follower2, false),
381                 new ServerInfo(follower3, true)));
382
383         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
384
385         //verify new peers
386         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
387         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
388                 Sets.newHashSet(context.getPeerIds()));
389         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
390         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
391         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
392
393         sendMessageToSupport(new ApplyJournalEntries(0));
394
395         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
396         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
397
398         //remove existing follower1
399         obj = new ServerConfigurationPayload(Arrays.asList(
400                 new ServerInfo(localId, true),
401                 new ServerInfo("follower2", true),
402                 new ServerInfo("follower3", true)));
403
404         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
405
406         //verify new peers
407         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
408         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
409     }
410
411     @Test
412     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
413         doReturn(false).when(mockPersistence).isRecoveryApplicable();
414
415         String follower = "follower";
416         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
417                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
418
419         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
420
421         //verify new peers
422         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
423     }
424
425     @Test
426     public void testOnSnapshotOfferWithServerConfiguration() {
427         long electionTerm = 2;
428         String electionVotedFor = "member-2";
429         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
430                                                         new ServerInfo(localId, true),
431                                                         new ServerInfo("follower1", true),
432                                                         new ServerInfo("follower2", true)));
433
434         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
435                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
436
437         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
438         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
439
440         sendMessageToSupport(snapshotOffer);
441
442         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
443         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
444         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
445         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
446         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
447             Sets.newHashSet(context.getPeerIds()));
448     }
449 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.