Bug 5740: Change TimeoutNow and Shutdown to externalizable proxy
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
22
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SnapshotMetadata;
25 import akka.persistence.SnapshotOffer;
26 import com.google.common.collect.Sets;
27 import java.io.Serializable;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import org.apache.commons.lang3.SerializationUtils;
32 import org.hamcrest.Description;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.ArgumentMatcher;
36 import org.mockito.InOrder;
37 import org.mockito.Matchers;
38 import org.mockito.Mock;
39 import org.mockito.Mockito;
40 import org.mockito.MockitoAnnotations;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.PersistentDataProvider;
43 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
44 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
45 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
46 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
47 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
48 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
49 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
51 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Unit tests for RaftActorRecoverySupport.
58  *
59  * @author Thomas Pantelis
60  */
61 public class RaftActorRecoverySupportTest {
62
63     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
64
65     @Mock
66     private DataPersistenceProvider mockPersistence;
67
68
69     @Mock
70     private RaftActorRecoveryCohort mockCohort;
71
72     @Mock
73     private RaftActorSnapshotCohort mockSnapshotCohort;
74
75     @Mock
76     PersistentDataProvider mockPersistentProvider;
77
78     private RaftActorRecoverySupport support;
79
80     private RaftActorContext context;
81     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
82     private final String localId = "leader";
83
84
85     @Before
86     public void setup() {
87         MockitoAnnotations.initMocks(this);
88
89         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
90                 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
91                 mockPersistence, applyState -> { }, LOG);
92
93         support = new RaftActorRecoverySupport(context, mockCohort);
94
95         doReturn(true).when(mockPersistence).isRecoveryApplicable();
96
97         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
98     }
99
100     private void sendMessageToSupport(Object message) {
101         sendMessageToSupport(message, false);
102     }
103
104     private void sendMessageToSupport(Object message, boolean expComplete) {
105         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
106         assertEquals("complete", expComplete, complete);
107     }
108
109     @Test
110     public void testOnReplicatedLogEntry() {
111         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
112
113         sendMessageToSupport(logEntry);
114
115         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
116         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
117         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
118         assertEquals("Last applied", -1, context.getLastApplied());
119         assertEquals("Commit index", -1, context.getCommitIndex());
120         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
121         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
122     }
123
124     @Test
125     public void testOnApplyJournalEntries() {
126         configParams.setJournalRecoveryLogBatchSize(5);
127
128         ReplicatedLog replicatedLog = context.getReplicatedLog();
129         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
130         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
131         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
132         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
133         replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
134         replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
135
136         sendMessageToSupport(new ApplyJournalEntries(2));
137
138         assertEquals("Last applied", 2, context.getLastApplied());
139         assertEquals("Commit index", 2, context.getCommitIndex());
140
141         sendMessageToSupport(new ApplyJournalEntries(4));
142
143         assertEquals("Last applied", 4, context.getLastApplied());
144         assertEquals("Last applied", 4, context.getLastApplied());
145
146         sendMessageToSupport(new ApplyJournalEntries(5));
147
148         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
149         assertEquals("Last applied", 5, context.getLastApplied());
150         assertEquals("Commit index", 5, context.getCommitIndex());
151         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
152         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
153
154         InOrder inOrder = Mockito.inOrder(mockCohort);
155         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
156
157         for (int i = 0; i < replicatedLog.size() - 1; i++) {
158             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
159         }
160
161         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
162         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
163         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
164
165         inOrder.verifyNoMoreInteractions();
166     }
167
168     @Test
169     public void testOnSnapshotOffer() {
170
171         ReplicatedLog replicatedLog = context.getReplicatedLog();
172         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
173         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
174         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
175
176         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
177                 new MockRaftActorContext.MockPayload("4", 4));
178
179         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
180                 new MockRaftActorContext.MockPayload("5", 5));
181
182         long lastAppliedDuringSnapshotCapture = 3;
183         long lastIndexDuringSnapshotCapture = 5;
184         long electionTerm = 2;
185         String electionVotedFor = "member-2";
186
187         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
188         Snapshot snapshot = Snapshot.create(snapshotState,
189                 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
190                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
191
192         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
193         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
194
195         sendMessageToSupport(snapshotOffer);
196
197         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
198         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
199         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
200         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
201         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
202         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
203         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
204         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
205         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
206         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
207
208         verify(mockCohort).applyRecoverySnapshot(snapshotState);
209     }
210
211     @Deprecated
212     @Test
213     public void testOnSnapshotOfferWithPreCarbonSnapshot() {
214
215         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
216                 new MockRaftActorContext.MockPayload("4", 4));
217
218         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
219                 new MockRaftActorContext.MockPayload("5", 5));
220
221         long lastAppliedDuringSnapshotCapture = 3;
222         long lastIndexDuringSnapshotCapture = 5;
223         long electionTerm = 2;
224         String electionVotedFor = "member-2";
225
226         List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
227         final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
228
229         org.opendaylight.controller.cluster.raft.Snapshot snapshot = org.opendaylight.controller.cluster.raft.Snapshot
230             .create(SerializationUtils.serialize((Serializable) snapshotData),
231                 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
232                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
233
234         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
235         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
236
237         doAnswer(invocation -> new MockSnapshotState(SerializationUtils.deserialize(
238             invocation.getArgumentAt(0, byte[].class))))
239                 .when(mockCohort).deserializePreCarbonSnapshot(any(byte[].class));
240
241         sendMessageToSupport(snapshotOffer);
242
243         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
244         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
245         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
246         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
247         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
248         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
249         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
250         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
251         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
252         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
253
254         verify(mockCohort).applyRecoverySnapshot(snapshotState);
255     }
256
257     @Test
258     public void testOnRecoveryCompletedWithRemainingBatch() {
259         ReplicatedLog replicatedLog = context.getReplicatedLog();
260         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
261         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
262
263         sendMessageToSupport(new ApplyJournalEntries(1));
264
265         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
266
267         assertEquals("Last applied", 1, context.getLastApplied());
268         assertEquals("Commit index", 1, context.getCommitIndex());
269
270         InOrder inOrder = Mockito.inOrder(mockCohort);
271         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
272
273         for (int i = 0; i < replicatedLog.size(); i++) {
274             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
275         }
276
277         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
278         inOrder.verify(mockCohort).getRestoreFromSnapshot();
279         inOrder.verifyNoMoreInteractions();
280     }
281
282     @Test
283     public void testOnRecoveryCompletedWithNoRemainingBatch() {
284         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
285
286         verify(mockCohort).getRestoreFromSnapshot();
287         verifyNoMoreInteractions(mockCohort);
288     }
289
290     @Test
291     public void testOnDeleteEntries() {
292         ReplicatedLog replicatedLog = context.getReplicatedLog();
293         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
294         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
295         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
296
297         sendMessageToSupport(new DeleteEntries(1));
298
299         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
300         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
301     }
302
303     @Test
304     public void testUpdateElectionTerm() {
305
306         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
307
308         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
309         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
310     }
311
312     @Test
313     public void testDataRecoveredWithPersistenceDisabled() {
314         doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
315         doReturn(false).when(mockPersistence).isRecoveryApplicable();
316         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
317
318         Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
319                 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
320         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
321
322         sendMessageToSupport(snapshotOffer);
323
324         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
325
326         sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
327         sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
328
329         sendMessageToSupport(new ApplyJournalEntries(4));
330
331         sendMessageToSupport(new DeleteEntries(5));
332
333         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
334         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
335         assertEquals("Last applied", -1, context.getLastApplied());
336         assertEquals("Commit index", -1, context.getCommitIndex());
337         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
338         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
339
340         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
341         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
342
343         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
344
345         verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
346         verify(mockCohort, never()).getRestoreFromSnapshot();
347         verifyNoMoreInteractions(mockCohort);
348
349         verify(mockPersistentProvider).deleteMessages(10L);
350     }
351
352     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
353         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
354             @Override
355             public boolean matches(Object argument) {
356                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
357                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
358             }
359
360             @Override
361             public void describeTo(Description description) {
362                 description.appendValue(new UpdateElectionTerm(term, votedFor));
363             }
364         });
365     }
366
367     @Test
368     public void testNoDataRecoveredWithPersistenceDisabled() {
369         doReturn(false).when(mockPersistence).isRecoveryApplicable();
370
371         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
372
373         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
374         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
375
376         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
377
378         verify(mockCohort).getRestoreFromSnapshot();
379         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
380     }
381
382     @Test
383     public void testServerConfigurationPayloadApplied() {
384         String follower1 = "follower1";
385         String follower2 = "follower2";
386         String follower3 = "follower3";
387
388         context.addToPeers(follower1, null, VotingState.VOTING);
389         context.addToPeers(follower2, null, VotingState.VOTING);
390
391         //add new Server
392         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
393                 new ServerInfo(localId, true),
394                 new ServerInfo(follower1, true),
395                 new ServerInfo(follower2, false),
396                 new ServerInfo(follower3, true)));
397
398         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
399
400         //verify new peers
401         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
402         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
403                 Sets.newHashSet(context.getPeerIds()));
404         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
405         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
406         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
407
408         sendMessageToSupport(new ApplyJournalEntries(0));
409
410         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
411         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
412
413         //remove existing follower1
414         obj = new ServerConfigurationPayload(Arrays.asList(
415                 new ServerInfo(localId, true),
416                 new ServerInfo("follower2", true),
417                 new ServerInfo("follower3", true)));
418
419         sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
420
421         //verify new peers
422         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
423         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
424     }
425
426     @Test
427     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
428         doReturn(false).when(mockPersistence).isRecoveryApplicable();
429
430         String follower = "follower";
431         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
432                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
433
434         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
435
436         //verify new peers
437         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
438     }
439
440     @Test
441     public void testOnSnapshotOfferWithServerConfiguration() {
442         long electionTerm = 2;
443         String electionVotedFor = "member-2";
444         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
445                                                         new ServerInfo(localId, true),
446                                                         new ServerInfo("follower1", true),
447                                                         new ServerInfo("follower2", true)));
448
449         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
450         Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
451                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
452
453         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
454         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
455
456         sendMessageToSupport(snapshotOffer);
457
458         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
459         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
460         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
461         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
462         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
463             Sets.newHashSet(context.getPeerIds()));
464     }
465 }