Bug 8606: Continue leadership transfer on pauseLeader timeout
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.hamcrest.Description;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.ArgumentMatcher;
32 import org.mockito.InOrder;
33 import org.mockito.Matchers;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.PersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
40 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
41 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
42 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
43 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
44 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
45 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
46 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
47 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
48 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Unit tests for RaftActorRecoverySupport.
54  *
55  * @author Thomas Pantelis
56  */
57 public class RaftActorRecoverySupportTest {
58
59     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
60
61     @Mock
62     private DataPersistenceProvider mockPersistence;
63
64
65     @Mock
66     private RaftActorRecoveryCohort mockCohort;
67
68     @Mock
69     private RaftActorSnapshotCohort mockSnapshotCohort;
70
71     @Mock
72     PersistentDataProvider mockPersistentProvider;
73
74     private RaftActorRecoverySupport support;
75
76     private RaftActorContext context;
77     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
78     private final String localId = "leader";
79
80
81     @Before
82     public void setup() {
83         MockitoAnnotations.initMocks(this);
84
85         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
86                 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
87                 mockPersistence, applyState -> { }, LOG);
88
89         support = new RaftActorRecoverySupport(context, mockCohort);
90
91         doReturn(true).when(mockPersistence).isRecoveryApplicable();
92
93         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
94     }
95
96     private void sendMessageToSupport(Object message) {
97         sendMessageToSupport(message, false);
98     }
99
100     private void sendMessageToSupport(Object message, boolean expComplete) {
101         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
102         assertEquals("complete", expComplete, complete);
103     }
104
105     @Test
106     public void testOnReplicatedLogEntry() {
107         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
108
109         sendMessageToSupport(logEntry);
110
111         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
112         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
113         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
114         assertEquals("Last applied", -1, context.getLastApplied());
115         assertEquals("Commit index", -1, context.getCommitIndex());
116         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
117         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
118     }
119
120     @Test
121     public void testOnApplyJournalEntries() {
122         configParams.setJournalRecoveryLogBatchSize(5);
123
124         ReplicatedLog replicatedLog = context.getReplicatedLog();
125         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
126         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
127         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
128         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
129         replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
130         replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
131
132         sendMessageToSupport(new ApplyJournalEntries(2));
133
134         assertEquals("Last applied", 2, context.getLastApplied());
135         assertEquals("Commit index", 2, context.getCommitIndex());
136
137         sendMessageToSupport(new ApplyJournalEntries(4));
138
139         assertEquals("Last applied", 4, context.getLastApplied());
140         assertEquals("Last applied", 4, context.getLastApplied());
141
142         sendMessageToSupport(new ApplyJournalEntries(5));
143
144         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
145         assertEquals("Last applied", 5, context.getLastApplied());
146         assertEquals("Commit index", 5, context.getCommitIndex());
147         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
148         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
149
150         InOrder inOrder = Mockito.inOrder(mockCohort);
151         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
152
153         for (int i = 0; i < replicatedLog.size() - 1; i++) {
154             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
155         }
156
157         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
158         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
159         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
160
161         inOrder.verifyNoMoreInteractions();
162     }
163
164     @Test
165     public void testOnSnapshotOffer() {
166
167         ReplicatedLog replicatedLog = context.getReplicatedLog();
168         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
169         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
170         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
171
172         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
173                 new MockRaftActorContext.MockPayload("4", 4));
174
175         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
176                 new MockRaftActorContext.MockPayload("5", 5));
177
178         long lastAppliedDuringSnapshotCapture = 3;
179         long lastIndexDuringSnapshotCapture = 5;
180         long electionTerm = 2;
181         String electionVotedFor = "member-2";
182
183         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
184         Snapshot snapshot = Snapshot.create(snapshotState,
185                 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
186                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
187
188         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
189         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
190
191         sendMessageToSupport(snapshotOffer);
192
193         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
194         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
195         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
196         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
197         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
198         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
199         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
200         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
201         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
202         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
203
204         verify(mockCohort).applyRecoverySnapshot(snapshotState);
205     }
206
207     @Test
208     public void testOnRecoveryCompletedWithRemainingBatch() {
209         ReplicatedLog replicatedLog = context.getReplicatedLog();
210         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
211         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
212
213         sendMessageToSupport(new ApplyJournalEntries(1));
214
215         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
216
217         assertEquals("Last applied", 1, context.getLastApplied());
218         assertEquals("Commit index", 1, context.getCommitIndex());
219
220         InOrder inOrder = Mockito.inOrder(mockCohort);
221         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
222
223         for (int i = 0; i < replicatedLog.size(); i++) {
224             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
225         }
226
227         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
228         inOrder.verify(mockCohort).getRestoreFromSnapshot();
229         inOrder.verifyNoMoreInteractions();
230     }
231
232     @Test
233     public void testOnRecoveryCompletedWithNoRemainingBatch() {
234         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
235
236         verify(mockCohort).getRestoreFromSnapshot();
237         verifyNoMoreInteractions(mockCohort);
238     }
239
240     @Test
241     public void testOnDeleteEntries() {
242         ReplicatedLog replicatedLog = context.getReplicatedLog();
243         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
244         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
245         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
246
247         sendMessageToSupport(new DeleteEntries(1));
248
249         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
250         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
251     }
252
253     @Test
254     public void testUpdateElectionTerm() {
255
256         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
257
258         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
259         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
260     }
261
262     @Test
263     public void testDataRecoveredWithPersistenceDisabled() {
264         doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
265         doReturn(false).when(mockPersistence).isRecoveryApplicable();
266         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
267
268         Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
269                 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
270         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
271
272         sendMessageToSupport(snapshotOffer);
273
274         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
275
276         sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
277         sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
278
279         sendMessageToSupport(new ApplyJournalEntries(4));
280
281         sendMessageToSupport(new DeleteEntries(5));
282
283         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
284         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
285         assertEquals("Last applied", -1, context.getLastApplied());
286         assertEquals("Commit index", -1, context.getCommitIndex());
287         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
288         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
289
290         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
291         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
292
293         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
294
295         verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
296         verify(mockCohort, never()).getRestoreFromSnapshot();
297         verifyNoMoreInteractions(mockCohort);
298
299         verify(mockPersistentProvider).deleteMessages(10L);
300     }
301
302     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
303         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
304             @Override
305             public boolean matches(Object argument) {
306                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
307                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
308             }
309
310             @Override
311             public void describeTo(Description description) {
312                 description.appendValue(new UpdateElectionTerm(term, votedFor));
313             }
314         });
315     }
316
317     @Test
318     public void testNoDataRecoveredWithPersistenceDisabled() {
319         doReturn(false).when(mockPersistence).isRecoveryApplicable();
320
321         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
322
323         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
324         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
325
326         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
327
328         verify(mockCohort).getRestoreFromSnapshot();
329         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
330     }
331
332     @Test
333     public void testServerConfigurationPayloadApplied() {
334         String follower1 = "follower1";
335         String follower2 = "follower2";
336         String follower3 = "follower3";
337
338         context.addToPeers(follower1, null, VotingState.VOTING);
339         context.addToPeers(follower2, null, VotingState.VOTING);
340
341         //add new Server
342         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
343                 new ServerInfo(localId, true),
344                 new ServerInfo(follower1, true),
345                 new ServerInfo(follower2, false),
346                 new ServerInfo(follower3, true)));
347
348         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
349
350         //verify new peers
351         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
352         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
353                 Sets.newHashSet(context.getPeerIds()));
354         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
355         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
356         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
357
358         sendMessageToSupport(new ApplyJournalEntries(0));
359
360         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
361         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
362
363         //remove existing follower1
364         obj = new ServerConfigurationPayload(Arrays.asList(
365                 new ServerInfo(localId, true),
366                 new ServerInfo("follower2", true),
367                 new ServerInfo("follower3", true)));
368
369         sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
370
371         //verify new peers
372         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
373         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
374     }
375
376     @Test
377     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
378         doReturn(false).when(mockPersistence).isRecoveryApplicable();
379
380         String follower = "follower";
381         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
382                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
383
384         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
385
386         //verify new peers
387         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
388     }
389
390     @Test
391     public void testOnSnapshotOfferWithServerConfiguration() {
392         long electionTerm = 2;
393         String electionVotedFor = "member-2";
394         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
395                                                         new ServerInfo(localId, true),
396                                                         new ServerInfo("follower1", true),
397                                                         new ServerInfo("follower2", true)));
398
399         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
400         Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
401                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
402
403         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
404         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
405
406         sendMessageToSupport(snapshotOffer);
407
408         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
409         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
410         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
411         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
412         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
413             Sets.newHashSet(context.getPeerIds()));
414     }
415 }