Bump versions to 2.0.3-SNAPSHOT
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.anyInt;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.SnapshotMetadata;
23 import akka.persistence.SnapshotOffer;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatchers;
31 import org.mockito.InOrder;
32 import org.mockito.Mock;
33 import org.mockito.Mockito;
34 import org.mockito.MockitoAnnotations;
35 import org.opendaylight.controller.cluster.DataPersistenceProvider;
36 import org.opendaylight.controller.cluster.PersistentDataProvider;
37 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
39 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
41 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
42 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
43 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
45 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
46 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Unit tests for RaftActorRecoverySupport.
52  *
53  * @author Thomas Pantelis
54  */
55 public class RaftActorRecoverySupportTest {
56
57     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
58
59     @Mock
60     private DataPersistenceProvider mockPersistence;
61
62
63     @Mock
64     private RaftActorRecoveryCohort mockCohort;
65
66     @Mock
67     private RaftActorSnapshotCohort mockSnapshotCohort;
68
69     @Mock
70     PersistentDataProvider mockPersistentProvider;
71
72     private RaftActorRecoverySupport support;
73
74     private RaftActorContext context;
75     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
76     private final String localId = "leader";
77
78
79     @Before
80     public void setup() {
81         MockitoAnnotations.initMocks(this);
82
83         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
84                 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
85                 mockPersistence, applyState -> { }, LOG,  MoreExecutors.directExecutor());
86
87         support = new RaftActorRecoverySupport(context, mockCohort);
88
89         doReturn(true).when(mockPersistence).isRecoveryApplicable();
90
91         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
92     }
93
94     private void sendMessageToSupport(final Object message) {
95         sendMessageToSupport(message, false);
96     }
97
98     private void sendMessageToSupport(final Object message, final boolean expComplete) {
99         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
100         assertEquals("complete", expComplete, complete);
101     }
102
103     @Test
104     public void testOnReplicatedLogEntry() {
105         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
106
107         sendMessageToSupport(logEntry);
108
109         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
110         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
111         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
112         assertEquals("Last applied", -1, context.getLastApplied());
113         assertEquals("Commit index", -1, context.getCommitIndex());
114         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
115         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
116     }
117
118     @Test
119     public void testOnApplyJournalEntries() {
120         configParams.setJournalRecoveryLogBatchSize(5);
121
122         ReplicatedLog replicatedLog = context.getReplicatedLog();
123         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
124         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
125         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
126         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
127         replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
128         replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
129
130         sendMessageToSupport(new ApplyJournalEntries(2));
131
132         assertEquals("Last applied", 2, context.getLastApplied());
133         assertEquals("Commit index", 2, context.getCommitIndex());
134
135         sendMessageToSupport(new ApplyJournalEntries(4));
136
137         assertEquals("Last applied", 4, context.getLastApplied());
138         assertEquals("Last applied", 4, context.getLastApplied());
139
140         sendMessageToSupport(new ApplyJournalEntries(5));
141
142         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
143         assertEquals("Last applied", 5, context.getLastApplied());
144         assertEquals("Commit index", 5, context.getCommitIndex());
145         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
146         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
147
148         InOrder inOrder = Mockito.inOrder(mockCohort);
149         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
150
151         for (int i = 0; i < replicatedLog.size() - 1; i++) {
152             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
153         }
154
155         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
156         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
157         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
158
159         inOrder.verifyNoMoreInteractions();
160     }
161
162     @Test
163     public void testOnSnapshotOffer() {
164
165         ReplicatedLog replicatedLog = context.getReplicatedLog();
166         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
167         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
168         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
169
170         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
171                 new MockRaftActorContext.MockPayload("4", 4));
172
173         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
174                 new MockRaftActorContext.MockPayload("5", 5));
175
176         long lastAppliedDuringSnapshotCapture = 3;
177         long lastIndexDuringSnapshotCapture = 5;
178         long electionTerm = 2;
179         String electionVotedFor = "member-2";
180
181         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
182         Snapshot snapshot = Snapshot.create(snapshotState,
183                 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
184                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
185
186         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
187         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
188
189         sendMessageToSupport(snapshotOffer);
190
191         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
192         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
193         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
194         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
195         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
196         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
197         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
198         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
199         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
200         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
201
202         verify(mockCohort).applyRecoverySnapshot(snapshotState);
203     }
204
205     @Test
206     public void testOnRecoveryCompletedWithRemainingBatch() {
207         ReplicatedLog replicatedLog = context.getReplicatedLog();
208         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
209         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
210
211         sendMessageToSupport(new ApplyJournalEntries(1));
212
213         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
214
215         assertEquals("Last applied", 1, context.getLastApplied());
216         assertEquals("Commit index", 1, context.getCommitIndex());
217
218         InOrder inOrder = Mockito.inOrder(mockCohort);
219         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
220
221         for (int i = 0; i < replicatedLog.size(); i++) {
222             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
223         }
224
225         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
226         inOrder.verify(mockCohort).getRestoreFromSnapshot();
227         inOrder.verifyNoMoreInteractions();
228     }
229
230     @Test
231     public void testOnRecoveryCompletedWithNoRemainingBatch() {
232         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
233
234         verify(mockCohort).getRestoreFromSnapshot();
235         verifyNoMoreInteractions(mockCohort);
236     }
237
238     @Test
239     public void testOnDeleteEntries() {
240         ReplicatedLog replicatedLog = context.getReplicatedLog();
241         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
242         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
243         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
244
245         sendMessageToSupport(new DeleteEntries(1));
246
247         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
248         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
249     }
250
251     @Test
252     public void testUpdateElectionTerm() {
253
254         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
255
256         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
257         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
258     }
259
260     @Test
261     public void testDataRecoveredWithPersistenceDisabled() {
262         doNothing().when(mockCohort).applyRecoverySnapshot(any());
263         doReturn(false).when(mockPersistence).isRecoveryApplicable();
264         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
265
266         Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
267                 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
268         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
269
270         sendMessageToSupport(snapshotOffer);
271
272         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
273
274         sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
275         sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
276
277         sendMessageToSupport(new ApplyJournalEntries(4));
278
279         sendMessageToSupport(new DeleteEntries(5));
280
281         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
282         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
283         assertEquals("Last applied", -1, context.getLastApplied());
284         assertEquals("Commit index", -1, context.getCommitIndex());
285         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
286         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
287
288         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
289         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
290
291         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
292
293         verify(mockCohort, never()).applyRecoverySnapshot(any());
294         verify(mockCohort, never()).getRestoreFromSnapshot();
295         verifyNoMoreInteractions(mockCohort);
296
297         verify(mockPersistentProvider).deleteMessages(10L);
298     }
299
300     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
301         return ArgumentMatchers.argThat(
302             other -> term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
303     }
304
305     @Test
306     public void testNoDataRecoveredWithPersistenceDisabled() {
307         doReturn(false).when(mockPersistence).isRecoveryApplicable();
308
309         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
310
311         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
312         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
313
314         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
315
316         verify(mockCohort).getRestoreFromSnapshot();
317         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
318     }
319
320     @Test
321     public void testServerConfigurationPayloadApplied() {
322         String follower1 = "follower1";
323         String follower2 = "follower2";
324         String follower3 = "follower3";
325
326         context.addToPeers(follower1, null, VotingState.VOTING);
327         context.addToPeers(follower2, null, VotingState.VOTING);
328
329         //add new Server
330         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
331                 new ServerInfo(localId, true),
332                 new ServerInfo(follower1, true),
333                 new ServerInfo(follower2, false),
334                 new ServerInfo(follower3, true)));
335
336         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
337
338         //verify new peers
339         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
340         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
341                 Sets.newHashSet(context.getPeerIds()));
342         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
343         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
344         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
345
346         sendMessageToSupport(new ApplyJournalEntries(0));
347
348         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
349         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
350
351         //remove existing follower1
352         obj = new ServerConfigurationPayload(Arrays.asList(
353                 new ServerInfo(localId, true),
354                 new ServerInfo("follower2", true),
355                 new ServerInfo("follower3", true)));
356
357         sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
358
359         //verify new peers
360         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
361         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
362     }
363
364     @Test
365     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
366         doReturn(false).when(mockPersistence).isRecoveryApplicable();
367
368         String follower = "follower";
369         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
370                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
371
372         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
373
374         //verify new peers
375         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
376     }
377
378     @Test
379     public void testOnSnapshotOfferWithServerConfiguration() {
380         long electionTerm = 2;
381         String electionVotedFor = "member-2";
382         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
383                                                         new ServerInfo(localId, true),
384                                                         new ServerInfo("follower1", true),
385                                                         new ServerInfo("follower2", true)));
386
387         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
388         Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
389                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
390
391         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
392         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
393
394         sendMessageToSupport(snapshotOffer);
395
396         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
397         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
398         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
399         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
400         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
401             Sets.newHashSet(context.getPeerIds()));
402     }
403 }