76571137c1451863de949a6b070b54c965ba1add
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.actor.Props;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SnapshotMetadata;
27 import akka.persistence.SnapshotOffer;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import java.io.OutputStream;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Optional;
33 import java.util.Set;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Consumer;
41 import org.junit.Assert;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.junit.runner.RunWith;
45 import org.mockito.ArgumentMatchers;
46 import org.mockito.InOrder;
47 import org.mockito.Mock;
48 import org.mockito.Mockito;
49 import org.mockito.junit.MockitoJUnitRunner;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.PersistentDataProvider;
52 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
53 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
54 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
55 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
56 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
57 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
58 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
59 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
60 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
61 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
62 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 /**
67  * Unit tests for RaftActorRecoverySupport.
68  *
69  * @author Thomas Pantelis
70  */
71 @RunWith(MockitoJUnitRunner.StrictStubs.class)
72 public class RaftActorRecoverySupportTest {
73     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
74
75     @Mock
76     private DataPersistenceProvider mockPersistence;
77
78     @Mock
79     private RaftActorRecoveryCohort mockCohort;
80
81     @Mock
82     PersistentDataProvider mockPersistentProvider;
83
84     ActorRef mockActorRef;
85
86     ActorSystem mockActorSystem;
87
88     private RaftActorRecoverySupport support;
89
90     private RaftActorContext context;
91     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
92     private final String localId = "leader";
93
94     @Before
95     public void setup() {
96         mockActorSystem = ActorSystem.create();
97         mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
98         context = new RaftActorContextImpl(mockActorRef, null, localId,
99                 new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
100                 Map.of(), configParams, mockPersistence, applyState -> {
101         }, LOG, MoreExecutors.directExecutor());
102
103         support = new RaftActorRecoverySupport(context, mockCohort);
104
105         doReturn(true).when(mockPersistence).isRecoveryApplicable();
106
107         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
108     }
109
110     private void sendMessageToSupport(final Object message) {
111         sendMessageToSupport(message, false);
112     }
113
114     private void sendMessageToSupport(final Object message, final boolean expComplete) {
115         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
116         assertEquals("complete", expComplete, complete);
117     }
118
119     @Test
120     public void testOnReplicatedLogEntry() {
121         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
122
123         sendMessageToSupport(logEntry);
124
125         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
126         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
127         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
128         assertEquals("Last applied", -1, context.getLastApplied());
129         assertEquals("Commit index", -1, context.getCommitIndex());
130         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
131         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
132     }
133
134     @Test
135     public void testOnApplyJournalEntries() {
136         configParams.setJournalRecoveryLogBatchSize(5);
137
138         ReplicatedLog replicatedLog = context.getReplicatedLog();
139         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
140         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
141         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
142         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
143         replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
144         replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
145
146         sendMessageToSupport(new ApplyJournalEntries(2));
147
148         assertEquals("Last applied", 2, context.getLastApplied());
149         assertEquals("Commit index", 2, context.getCommitIndex());
150
151         sendMessageToSupport(new ApplyJournalEntries(4));
152
153         assertEquals("Last applied", 4, context.getLastApplied());
154         assertEquals("Last applied", 4, context.getLastApplied());
155
156         sendMessageToSupport(new ApplyJournalEntries(5));
157
158         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
159         assertEquals("Last applied", 5, context.getLastApplied());
160         assertEquals("Commit index", 5, context.getCommitIndex());
161         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
162         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
163
164         InOrder inOrder = Mockito.inOrder(mockCohort);
165         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
166
167         for (int i = 0; i < replicatedLog.size() - 1; i++) {
168             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
169         }
170
171         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
172         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
173         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
174
175         inOrder.verifyNoMoreInteractions();
176     }
177
178     @Test
179     public void testIncrementalRecovery() {
180         int recoverySnapshotInterval = 3;
181         int numberOfEntries = 5;
182         configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
183         Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
184         context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
185
186         ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
187         ReplicatedLog replicatedLog = context.getReplicatedLog();
188
189         for (int i = 0; i <= numberOfEntries; i++) {
190             replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
191                 new MockRaftActorContext.MockPayload(String.valueOf(i))));
192         }
193
194         AtomicInteger entryCount = new AtomicInteger();
195         ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
196             int run = entryCount.getAndIncrement();
197             LOG.info("Sending entry number {}", run);
198             sendMessageToSupport(new ApplyJournalEntries(run));
199         }, 0, 1, TimeUnit.SECONDS);
200
201         ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
202             numberOfEntries, TimeUnit.SECONDS);
203         try {
204             canceller.get();
205             verify(mockSnapshotConsumer, times(1)).accept(any());
206             applyEntriesExecutor.shutdown();
207         } catch (InterruptedException | ExecutionException e) {
208             Assert.fail();
209         }
210     }
211
212     @Test
213     public void testOnSnapshotOffer() {
214
215         ReplicatedLog replicatedLog = context.getReplicatedLog();
216         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
217         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
218         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
219
220         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
221                 new MockRaftActorContext.MockPayload("4", 4));
222
223         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
224                 new MockRaftActorContext.MockPayload("5", 5));
225
226         long lastAppliedDuringSnapshotCapture = 3;
227         long lastIndexDuringSnapshotCapture = 5;
228         long electionTerm = 2;
229         String electionVotedFor = "member-2";
230
231         MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1")));
232         Snapshot snapshot = Snapshot.create(snapshotState,
233                 List.of(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
234                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
235
236         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
237         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
238
239         sendMessageToSupport(snapshotOffer);
240
241         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
242         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
243         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
244         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
245         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
246         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
247         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
248         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
249         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
250         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
251
252         verify(mockCohort).applyRecoverySnapshot(snapshotState);
253     }
254
255     @Test
256     public void testOnRecoveryCompletedWithRemainingBatch() {
257         ReplicatedLog replicatedLog = context.getReplicatedLog();
258         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
259         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
260
261         sendMessageToSupport(new ApplyJournalEntries(1));
262
263         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
264
265         assertEquals("Last applied", 1, context.getLastApplied());
266         assertEquals("Commit index", 1, context.getCommitIndex());
267
268         InOrder inOrder = Mockito.inOrder(mockCohort);
269         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
270
271         for (int i = 0; i < replicatedLog.size(); i++) {
272             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
273         }
274
275         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
276         inOrder.verify(mockCohort).getRestoreFromSnapshot();
277         inOrder.verifyNoMoreInteractions();
278     }
279
280     @Test
281     public void testOnRecoveryCompletedWithNoRemainingBatch() {
282         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
283
284         verify(mockCohort).getRestoreFromSnapshot();
285         verifyNoMoreInteractions(mockCohort);
286     }
287
288     @Test
289     public void testOnDeleteEntries() {
290         ReplicatedLog replicatedLog = context.getReplicatedLog();
291         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
292         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
293         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
294
295         sendMessageToSupport(new DeleteEntries(1));
296
297         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
298         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
299     }
300
301     @Test
302     public void testUpdateElectionTerm() {
303
304         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
305
306         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
307         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
308     }
309
310     @Test
311     public void testDataRecoveredWithPersistenceDisabled() {
312         doReturn(false).when(mockPersistence).isRecoveryApplicable();
313         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
314
315         Snapshot snapshot = Snapshot.create(new MockSnapshotState(List.of(new MockPayload("1"))),
316                 List.of(), 3, 1, 3, 1, -1, null, null);
317         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
318
319         sendMessageToSupport(snapshotOffer);
320
321         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
322
323         sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
324         sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
325
326         sendMessageToSupport(new ApplyJournalEntries(4));
327
328         sendMessageToSupport(new DeleteEntries(5));
329
330         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
331         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
332         assertEquals("Last applied", -1, context.getLastApplied());
333         assertEquals("Commit index", -1, context.getCommitIndex());
334         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
335         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
336
337         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
338         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
339
340         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
341
342         verify(mockCohort, never()).applyRecoverySnapshot(any());
343         verify(mockCohort, never()).getRestoreFromSnapshot();
344         verifyNoMoreInteractions(mockCohort);
345
346         verify(mockPersistentProvider).deleteMessages(10L);
347     }
348
349     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
350         return ArgumentMatchers.argThat(other ->
351                 term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
352     }
353
354     @Test
355     public void testNoDataRecoveredWithPersistenceDisabled() {
356         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
357
358         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
359         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
360
361         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
362
363         verify(mockCohort).getRestoreFromSnapshot();
364         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
365     }
366
367     @Test
368     public void testServerConfigurationPayloadApplied() {
369         String follower1 = "follower1";
370         String follower2 = "follower2";
371         String follower3 = "follower3";
372
373         context.addToPeers(follower1, null, VotingState.VOTING);
374         context.addToPeers(follower2, null, VotingState.VOTING);
375
376         //add new Server
377         ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of(
378                 new ServerInfo(localId, true),
379                 new ServerInfo(follower1, true),
380                 new ServerInfo(follower2, false),
381                 new ServerInfo(follower3, true)));
382
383         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
384
385         //verify new peers
386         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
387         assertEquals("New peer Ids", Set.of(follower1, follower2, follower3), Set.copyOf(context.getPeerIds()));
388         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
389         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
390         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
391
392         sendMessageToSupport(new ApplyJournalEntries(0));
393
394         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
395         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
396
397         //remove existing follower1
398         obj = new ServerConfigurationPayload(List.of(
399                 new ServerInfo(localId, true),
400                 new ServerInfo("follower2", true),
401                 new ServerInfo("follower3", true)));
402
403         sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
404
405         //verify new peers
406         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
407         assertEquals("New peer Ids", Set.of(follower2, follower3), Set.copyOf(context.getPeerIds()));
408     }
409
410     @Test
411     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
412         doReturn(false).when(mockPersistence).isRecoveryApplicable();
413
414         String follower = "follower";
415         ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of(
416                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
417
418         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
419
420         //verify new peers
421         assertEquals("New peer Ids", Set.of(follower), Set.copyOf(context.getPeerIds()));
422     }
423
424     @Test
425     public void testOnSnapshotOfferWithServerConfiguration() {
426         long electionTerm = 2;
427         String electionVotedFor = "member-2";
428         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(List.of(
429                 new ServerInfo(localId, true),
430                 new ServerInfo("follower1", true),
431                 new ServerInfo("follower2", true)));
432
433         MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1")));
434         Snapshot snapshot = Snapshot.create(snapshotState, List.of(),
435                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
436
437         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
438         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
439
440         sendMessageToSupport(snapshotOffer);
441
442         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
443         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
444         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
445         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
446         assertEquals("Peer List", Set.of("follower1", "follower2"), Set.copyOf(context.getPeerIds()));
447     }
448 }