Allow incremental recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.anyInt;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
22
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSystem;
25 import akka.actor.Props;
26 import akka.persistence.RecoveryCompleted;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.Optional;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledFuture;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Consumer;
42 import org.junit.Assert;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.ArgumentMatchers;
46 import org.mockito.InOrder;
47 import org.mockito.Mock;
48 import org.mockito.Mockito;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.PersistentDataProvider;
52 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
53 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
54 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
55 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
56 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
57 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
58 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
59 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
60 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
61 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
62 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 /**
67  * Unit tests for RaftActorRecoverySupport.
68  *
69  * @author Thomas Pantelis
70  */
71 public class RaftActorRecoverySupportTest {
72
73     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
74
75     @Mock
76     private DataPersistenceProvider mockPersistence;
77
78     @Mock
79     private RaftActorRecoveryCohort mockCohort;
80
81     @Mock
82     PersistentDataProvider mockPersistentProvider;
83
84     ActorRef mockActorRef;
85
86     ActorSystem mockActorSystem;
87
88     private RaftActorRecoverySupport support;
89
90     private RaftActorContext context;
91     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
92     private final String localId = "leader";
93
94     @Before
95     public void setup() {
96         MockitoAnnotations.initMocks(this);
97         mockActorSystem = ActorSystem.create();
98         mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
99         context = new RaftActorContextImpl(mockActorRef, null, localId,
100                 new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
101                 Collections.<String, String>emptyMap(), configParams, mockPersistence, applyState -> {
102         }, LOG, MoreExecutors.directExecutor());
103
104         support = new RaftActorRecoverySupport(context, mockCohort);
105
106         doReturn(true).when(mockPersistence).isRecoveryApplicable();
107
108         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
109     }
110
111     private void sendMessageToSupport(final Object message) {
112         sendMessageToSupport(message, false);
113     }
114
115     private void sendMessageToSupport(final Object message, final boolean expComplete) {
116         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
117         assertEquals("complete", expComplete, complete);
118     }
119
120     @Test
121     public void testOnReplicatedLogEntry() {
122         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
123
124         sendMessageToSupport(logEntry);
125
126         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
127         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
128         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
129         assertEquals("Last applied", -1, context.getLastApplied());
130         assertEquals("Commit index", -1, context.getCommitIndex());
131         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
132         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
133     }
134
135     @Test
136     public void testOnApplyJournalEntries() {
137         configParams.setJournalRecoveryLogBatchSize(5);
138
139         ReplicatedLog replicatedLog = context.getReplicatedLog();
140         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
141         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
142         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
143         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
144         replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
145         replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
146
147         sendMessageToSupport(new ApplyJournalEntries(2));
148
149         assertEquals("Last applied", 2, context.getLastApplied());
150         assertEquals("Commit index", 2, context.getCommitIndex());
151
152         sendMessageToSupport(new ApplyJournalEntries(4));
153
154         assertEquals("Last applied", 4, context.getLastApplied());
155         assertEquals("Last applied", 4, context.getLastApplied());
156
157         sendMessageToSupport(new ApplyJournalEntries(5));
158
159         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
160         assertEquals("Last applied", 5, context.getLastApplied());
161         assertEquals("Commit index", 5, context.getCommitIndex());
162         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
163         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
164
165         InOrder inOrder = Mockito.inOrder(mockCohort);
166         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
167
168         for (int i = 0; i < replicatedLog.size() - 1; i++) {
169             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
170         }
171
172         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
173         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
174         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
175
176         inOrder.verifyNoMoreInteractions();
177     }
178
179     @Test
180     public void testIncrementalRecovery() {
181         int recoverySnapshotInterval = 3;
182         int numberOfEntries = 5;
183         configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
184         Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
185         context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
186
187         ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
188         ReplicatedLog replicatedLog = context.getReplicatedLog();
189
190         for (int i = 0; i <= numberOfEntries; i++) {
191             replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
192                 new MockRaftActorContext.MockPayload(String.valueOf(i))));
193         }
194
195         AtomicInteger entryCount = new AtomicInteger();
196         ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
197             int run = entryCount.getAndIncrement();
198             LOG.info("Sending entry number {}", run);
199             sendMessageToSupport(new ApplyJournalEntries(run));
200         }, 0, 1, TimeUnit.SECONDS);
201
202         ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
203             numberOfEntries, TimeUnit.SECONDS);
204         try {
205             canceller.get();
206             verify(mockSnapshotConsumer, times(1)).accept(any());
207             applyEntriesExecutor.shutdown();
208         } catch (InterruptedException | ExecutionException e) {
209             Assert.fail();
210         }
211     }
212
213     @Test
214     public void testOnSnapshotOffer() {
215
216         ReplicatedLog replicatedLog = context.getReplicatedLog();
217         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
218         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
219         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
220
221         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
222                 new MockRaftActorContext.MockPayload("4", 4));
223
224         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
225                 new MockRaftActorContext.MockPayload("5", 5));
226
227         long lastAppliedDuringSnapshotCapture = 3;
228         long lastIndexDuringSnapshotCapture = 5;
229         long electionTerm = 2;
230         String electionVotedFor = "member-2";
231
232         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
233         Snapshot snapshot = Snapshot.create(snapshotState,
234                 Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
235                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
236
237         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
238         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
239
240         sendMessageToSupport(snapshotOffer);
241
242         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
243         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
244         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
245         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
246         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
247         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
248         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
249         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
250         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
251         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
252
253         verify(mockCohort).applyRecoverySnapshot(snapshotState);
254     }
255
256     @Test
257     public void testOnRecoveryCompletedWithRemainingBatch() {
258         ReplicatedLog replicatedLog = context.getReplicatedLog();
259         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
260         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
261
262         sendMessageToSupport(new ApplyJournalEntries(1));
263
264         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
265
266         assertEquals("Last applied", 1, context.getLastApplied());
267         assertEquals("Commit index", 1, context.getCommitIndex());
268
269         InOrder inOrder = Mockito.inOrder(mockCohort);
270         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
271
272         for (int i = 0; i < replicatedLog.size(); i++) {
273             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
274         }
275
276         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
277         inOrder.verify(mockCohort).getRestoreFromSnapshot();
278         inOrder.verifyNoMoreInteractions();
279     }
280
281     @Test
282     public void testOnRecoveryCompletedWithNoRemainingBatch() {
283         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
284
285         verify(mockCohort).getRestoreFromSnapshot();
286         verifyNoMoreInteractions(mockCohort);
287     }
288
289     @Test
290     public void testOnDeleteEntries() {
291         ReplicatedLog replicatedLog = context.getReplicatedLog();
292         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
293         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
294         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
295
296         sendMessageToSupport(new DeleteEntries(1));
297
298         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
299         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
300     }
301
302     @Test
303     public void testUpdateElectionTerm() {
304
305         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
306
307         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
308         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
309     }
310
311     @Test
312     public void testDataRecoveredWithPersistenceDisabled() {
313         doNothing().when(mockCohort).applyRecoverySnapshot(any());
314         doReturn(false).when(mockPersistence).isRecoveryApplicable();
315         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
316
317         Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
318                 Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
319         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
320
321         sendMessageToSupport(snapshotOffer);
322
323         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
324
325         sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
326         sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
327
328         sendMessageToSupport(new ApplyJournalEntries(4));
329
330         sendMessageToSupport(new DeleteEntries(5));
331
332         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
333         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
334         assertEquals("Last applied", -1, context.getLastApplied());
335         assertEquals("Commit index", -1, context.getCommitIndex());
336         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
337         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
338
339         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
340         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
341
342         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
343
344         verify(mockCohort, never()).applyRecoverySnapshot(any());
345         verify(mockCohort, never()).getRestoreFromSnapshot();
346         verifyNoMoreInteractions(mockCohort);
347
348         verify(mockPersistentProvider).deleteMessages(10L);
349     }
350
351     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
352         return ArgumentMatchers.argThat(other ->
353                 term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
354     }
355
356     @Test
357     public void testNoDataRecoveredWithPersistenceDisabled() {
358         doReturn(false).when(mockPersistence).isRecoveryApplicable();
359
360         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
361
362         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
363         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
364
365         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
366
367         verify(mockCohort).getRestoreFromSnapshot();
368         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
369     }
370
371     @Test
372     public void testServerConfigurationPayloadApplied() {
373         String follower1 = "follower1";
374         String follower2 = "follower2";
375         String follower3 = "follower3";
376
377         context.addToPeers(follower1, null, VotingState.VOTING);
378         context.addToPeers(follower2, null, VotingState.VOTING);
379
380         //add new Server
381         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
382                 new ServerInfo(localId, true),
383                 new ServerInfo(follower1, true),
384                 new ServerInfo(follower2, false),
385                 new ServerInfo(follower3, true)));
386
387         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
388
389         //verify new peers
390         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
391         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
392                 Sets.newHashSet(context.getPeerIds()));
393         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
394         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
395         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
396
397         sendMessageToSupport(new ApplyJournalEntries(0));
398
399         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
400         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
401
402         //remove existing follower1
403         obj = new ServerConfigurationPayload(Arrays.asList(
404                 new ServerInfo(localId, true),
405                 new ServerInfo("follower2", true),
406                 new ServerInfo("follower3", true)));
407
408         sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
409
410         //verify new peers
411         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
412         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
413     }
414
415     @Test
416     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
417         doReturn(false).when(mockPersistence).isRecoveryApplicable();
418
419         String follower = "follower";
420         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
421                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
422
423         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
424
425         //verify new peers
426         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
427     }
428
429     @Test
430     public void testOnSnapshotOfferWithServerConfiguration() {
431         long electionTerm = 2;
432         String electionVotedFor = "member-2";
433         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
434                 new ServerInfo(localId, true),
435                 new ServerInfo("follower1", true),
436                 new ServerInfo("follower2", true)));
437
438         MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
439         Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
440                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
441
442         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
443         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
444
445         sendMessageToSupport(snapshotOffer);
446
447         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
448         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
449         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
450         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
451         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
452                 Sets.newHashSet(context.getPeerIds()));
453     }
454 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.