Bug 2187: Persisting Actor peerIds' in snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.verifyNoMoreInteractions;
19 import akka.japi.Procedure;
20 import akka.persistence.RecoveryCompleted;
21 import akka.persistence.SnapshotMetadata;
22 import akka.persistence.SnapshotOffer;
23 import akka.persistence.SnapshotSelectionCriteria;
24 import com.google.common.collect.Sets;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import org.hamcrest.Description;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatcher;
31 import org.mockito.InOrder;
32 import org.mockito.Matchers;
33 import org.mockito.Mock;
34 import org.mockito.Mockito;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.PersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
42 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
43 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
44 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Unit tests for RaftActorRecoverySupport.
50  *
51  * @author Thomas Pantelis
52  */
53 public class RaftActorRecoverySupportTest {
54
55     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
56
57     @Mock
58     private DataPersistenceProvider mockPersistence;
59
60     @Mock
61     private RaftActorBehavior mockBehavior;
62
63     @Mock
64     private RaftActorRecoveryCohort mockCohort;
65
66     @Mock
67     PersistentDataProvider mockPersistentProvider;
68
69     private RaftActorRecoverySupport support;
70
71     private RaftActorContext context;
72     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
73     private final String localId = "leader";
74
75
76     @Before
77     public void setup() {
78         MockitoAnnotations.initMocks(this);
79
80         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
81                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
82
83         support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
84
85         doReturn(true).when(mockPersistence).isRecoveryApplicable();
86
87         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
88     }
89
90     private void sendMessageToSupport(Object message) {
91         sendMessageToSupport(message, false);
92     }
93
94     private void sendMessageToSupport(Object message, boolean expComplete) {
95         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
96         assertEquals("complete", expComplete, complete);
97     }
98
99     @Test
100     public void testOnReplicatedLogEntry() {
101         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
102                 1, new MockRaftActorContext.MockPayload("1", 5));
103
104         sendMessageToSupport(logEntry);
105
106         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
107         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
108         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
109         assertEquals("Last applied", -1, context.getLastApplied());
110         assertEquals("Commit index", -1, context.getCommitIndex());
111         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
112         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
113     }
114
115     @Test
116     public void testOnApplyJournalEntries() {
117         configParams.setJournalRecoveryLogBatchSize(5);
118
119         ReplicatedLog replicatedLog = context.getReplicatedLog();
120         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121                 0, new MockRaftActorContext.MockPayload("0")));
122         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123                 1, new MockRaftActorContext.MockPayload("1")));
124         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125                 2, new MockRaftActorContext.MockPayload("2")));
126         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127                 3, new MockRaftActorContext.MockPayload("3")));
128         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
129                 4, new MockRaftActorContext.MockPayload("4")));
130         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
131                 5, new MockRaftActorContext.MockPayload("5")));
132
133         sendMessageToSupport(new ApplyJournalEntries(2));
134
135         assertEquals("Last applied", 2, context.getLastApplied());
136         assertEquals("Commit index", 2, context.getCommitIndex());
137
138         sendMessageToSupport(new ApplyJournalEntries(4));
139
140         assertEquals("Last applied", 4, context.getLastApplied());
141         assertEquals("Last applied", 4, context.getLastApplied());
142
143         sendMessageToSupport(new ApplyJournalEntries(5));
144
145         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
146         assertEquals("Last applied", 5, context.getLastApplied());
147         assertEquals("Commit index", 5, context.getCommitIndex());
148         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
149         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
150
151         InOrder inOrder = Mockito.inOrder(mockCohort);
152         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
153
154         for(int i = 0; i < replicatedLog.size() - 1; i++) {
155             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
156         }
157
158         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
159         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
160         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
161
162         inOrder.verifyNoMoreInteractions();
163     }
164
165     @Test
166     public void testOnApplyLogEntries() {
167         ReplicatedLog replicatedLog = context.getReplicatedLog();
168         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
169                 0, new MockRaftActorContext.MockPayload("0")));
170
171         sendMessageToSupport(new ApplyLogEntries(0));
172
173         assertEquals("Last applied", 0, context.getLastApplied());
174         assertEquals("Commit index", 0, context.getCommitIndex());
175     }
176
177     @Test
178     public void testOnSnapshotOffer() {
179
180         ReplicatedLog replicatedLog = context.getReplicatedLog();
181         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
182                 1, new MockRaftActorContext.MockPayload("1")));
183         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
184                 2, new MockRaftActorContext.MockPayload("2")));
185         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
186                 3, new MockRaftActorContext.MockPayload("3")));
187
188         byte[] snapshotBytes = {1,2,3,4,5};
189
190         ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
191                 4, new MockRaftActorContext.MockPayload("4", 4));
192
193         ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
194                 5, new MockRaftActorContext.MockPayload("5", 5));
195
196         long lastAppliedDuringSnapshotCapture = 3;
197         long lastIndexDuringSnapshotCapture = 5;
198         long electionTerm = 2;
199         String electionVotedFor = "member-2";
200
201         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
202                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
203
204         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
205         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
206
207         sendMessageToSupport(snapshotOffer);
208
209         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
210         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
211         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
212         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
213         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
214         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
215         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
216         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
217         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
218         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
219
220         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
221     }
222
223     @Test
224     public void testOnRecoveryCompletedWithRemainingBatch() {
225         ReplicatedLog replicatedLog = context.getReplicatedLog();
226         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
227                 0, new MockRaftActorContext.MockPayload("0")));
228         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
229                 1, new MockRaftActorContext.MockPayload("1")));
230
231         sendMessageToSupport(new ApplyJournalEntries(1));
232
233         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
234
235         assertEquals("Last applied", 1, context.getLastApplied());
236         assertEquals("Commit index", 1, context.getCommitIndex());
237
238         InOrder inOrder = Mockito.inOrder(mockCohort);
239         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
240
241         for(int i = 0; i < replicatedLog.size(); i++) {
242             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
243         }
244
245         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
246         inOrder.verify(mockCohort).getRestoreFromSnapshot();
247         inOrder.verifyNoMoreInteractions();
248     }
249
250     @Test
251     public void testOnRecoveryCompletedWithNoRemainingBatch() {
252         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
253
254         verify(mockCohort).getRestoreFromSnapshot();
255         verifyNoMoreInteractions(mockCohort);
256     }
257
258     @Test
259     public void testOnDeprecatedDeleteEntries() {
260         ReplicatedLog replicatedLog = context.getReplicatedLog();
261         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
262                 0, new MockRaftActorContext.MockPayload("0")));
263         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
264                 1, new MockRaftActorContext.MockPayload("1")));
265         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
266                 2, new MockRaftActorContext.MockPayload("2")));
267
268         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
269
270         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
271         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
272     }
273
274     @Test
275     public void testOnDeleteEntries() {
276         ReplicatedLog replicatedLog = context.getReplicatedLog();
277         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
278                 0, new MockRaftActorContext.MockPayload("0")));
279         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
280                 1, new MockRaftActorContext.MockPayload("1")));
281         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
282                 2, new MockRaftActorContext.MockPayload("2")));
283
284         sendMessageToSupport(new DeleteEntries(1));
285
286         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
287         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
288     }
289
290     @Test
291     public void testUpdateElectionTerm() {
292
293         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
294
295         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
296         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
297     }
298
299     @Test
300     public void testDeprecatedUpdateElectionTerm() {
301
302         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
303
304         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
305         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
306     }
307
308     @SuppressWarnings("unchecked")
309     @Test
310     public void testDataRecoveredWithPersistenceDisabled() {
311         doReturn(false).when(mockPersistence).isRecoveryApplicable();
312         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
313
314         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
315
316         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
317         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
318
319         sendMessageToSupport(snapshotOffer);
320
321         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
322                 4, new MockRaftActorContext.MockPayload("4")));
323         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
324                 5, new MockRaftActorContext.MockPayload("5")));
325
326         sendMessageToSupport(new ApplyJournalEntries(4));
327
328         sendMessageToSupport(new DeleteEntries(5));
329
330         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
331
332         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
333         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
334         assertEquals("Last applied", -1, context.getLastApplied());
335         assertEquals("Commit index", -1, context.getCommitIndex());
336         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
337         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
338
339         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
340         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
341
342         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
343
344         verify(mockCohort).getRestoreFromSnapshot();
345         verifyNoMoreInteractions(mockCohort);
346
347         verify(mockPersistentProvider).deleteMessages(10L);
348         verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
349         verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
350     }
351
352     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
353         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
354             @Override
355             public boolean matches(Object argument) {
356                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
357                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
358             }
359
360             @Override
361             public void describeTo(Description description) {
362                 description.appendValue(new UpdateElectionTerm(term, votedFor));
363             }
364         });
365     }
366
367     @Test
368     public void testNoDataRecoveredWithPersistenceDisabled() {
369         doReturn(false).when(mockPersistence).isRecoveryApplicable();
370
371         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
372
373         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
374         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
375
376         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
377
378         verify(mockCohort).getRestoreFromSnapshot();
379         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
380     }
381
382     @Test
383     public void testServerConfigurationPayloadApplied() {
384         String follower1 = "follower1";
385         String follower2 = "follower2";
386         String follower3 = "follower3";
387
388         context.addToPeers(follower1, null, VotingState.VOTING);
389         context.addToPeers(follower2, null, VotingState.VOTING);
390
391         //add new Server
392         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
393                 new ServerInfo(localId, true),
394                 new ServerInfo(follower1, true),
395                 new ServerInfo(follower2, false),
396                 new ServerInfo(follower3, true)));
397
398         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
399
400         //verify new peers
401         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
402         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
403                 Sets.newHashSet(context.getPeerIds()));
404         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
405         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
406         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
407
408         sendMessageToSupport(new ApplyJournalEntries(0));
409
410         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
411         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
412
413         //remove existing follower1
414         obj = new ServerConfigurationPayload(Arrays.asList(
415                 new ServerInfo(localId, true),
416                 new ServerInfo("follower2", true),
417                 new ServerInfo("follower3", true)));
418
419         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
420
421         //verify new peers
422         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
423         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
424     }
425
426     @Test
427     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
428         doReturn(false).when(mockPersistence).isRecoveryApplicable();
429
430         String follower = "follower";
431         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
432                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
433
434         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
435
436         //verify new peers
437         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
438     }
439
440     @Test
441     public void testOnSnapshotOfferWithServerConfiguration() {
442         long electionTerm = 2;
443         String electionVotedFor = "member-2";
444         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
445                                                         new ServerInfo(localId, true),
446                                                         new ServerInfo("follower1", true),
447                                                         new ServerInfo("follower2", true)));
448
449         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
450                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
451
452         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
453         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
454
455         sendMessageToSupport(snapshotOffer);
456
457         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
458         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
459         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
460         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
461         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
462             Sets.newHashSet(context.getPeerIds()));
463     }
464 }