akka.actor.provider set to 'cluster'
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSystem;
24 import akka.actor.Props;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SnapshotMetadata;
27 import akka.persistence.SnapshotOffer;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import java.io.OutputStream;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Optional;
34 import java.util.Set;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledFuture;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Consumer;
42 import org.junit.After;
43 import org.junit.Assert;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.junit.runner.RunWith;
47 import org.mockito.ArgumentMatchers;
48 import org.mockito.InOrder;
49 import org.mockito.Mock;
50 import org.mockito.Mockito;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.opendaylight.controller.cluster.DataPersistenceProvider;
53 import org.opendaylight.controller.cluster.PersistentDataProvider;
54 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
55 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
56 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
57 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
58 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
59 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
60 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
61 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
62 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
64 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * Unit tests for RaftActorRecoverySupport.
70  *
71  * @author Thomas Pantelis
72  */
73 @RunWith(MockitoJUnitRunner.StrictStubs.class)
74 public class RaftActorRecoverySupportTest {
75     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
76
77     @Mock
78     private DataPersistenceProvider mockPersistence;
79
80     @Mock
81     private RaftActorRecoveryCohort mockCohort;
82
83     @Mock
84     PersistentDataProvider mockPersistentProvider;
85
86     ActorRef mockActorRef;
87
88     ActorSystem mockActorSystem;
89
90     private RaftActorRecoverySupport support;
91
92     private RaftActorContext context;
93     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
94     private final String localId = "leader";
95
96     @Before
97     public void setup() {
98         mockActorSystem = ActorSystem.create();
99         mockActorRef = mockActorSystem.actorOf(Props.create(DoNothingActor.class));
100         context = new RaftActorContextImpl(mockActorRef, null, localId,
101                 new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1,
102                 Map.of(), configParams, mockPersistence, applyState -> {
103         }, LOG, MoreExecutors.directExecutor());
104
105         support = new RaftActorRecoverySupport(context, mockCohort);
106
107         doReturn(true).when(mockPersistence).isRecoveryApplicable();
108
109         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
110     }
111
112     @After
113     public void tearDown() {
114         TestKit.shutdownActorSystem(mockActorSystem);
115     }
116
117     private void sendMessageToSupport(final Object message) {
118         sendMessageToSupport(message, false);
119     }
120
121     private void sendMessageToSupport(final Object message, final boolean expComplete) {
122         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
123         assertEquals("complete", expComplete, complete);
124     }
125
126     @Test
127     public void testOnReplicatedLogEntry() {
128         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
129
130         sendMessageToSupport(logEntry);
131
132         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
133         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
134         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
135         assertEquals("Last applied", -1, context.getLastApplied());
136         assertEquals("Commit index", -1, context.getCommitIndex());
137         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
138         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
139     }
140
141     @Test
142     public void testOnApplyJournalEntries() {
143         configParams.setJournalRecoveryLogBatchSize(5);
144
145         ReplicatedLog replicatedLog = context.getReplicatedLog();
146         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
147         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
148         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
149         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
150         replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
151         replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
152
153         sendMessageToSupport(new ApplyJournalEntries(2));
154
155         assertEquals("Last applied", 2, context.getLastApplied());
156         assertEquals("Commit index", 2, context.getCommitIndex());
157
158         sendMessageToSupport(new ApplyJournalEntries(4));
159
160         assertEquals("Last applied", 4, context.getLastApplied());
161         assertEquals("Last applied", 4, context.getLastApplied());
162
163         sendMessageToSupport(new ApplyJournalEntries(5));
164
165         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
166         assertEquals("Last applied", 5, context.getLastApplied());
167         assertEquals("Commit index", 5, context.getCommitIndex());
168         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
169         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
170
171         InOrder inOrder = Mockito.inOrder(mockCohort);
172         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
173
174         for (int i = 0; i < replicatedLog.size() - 1; i++) {
175             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
176         }
177
178         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
179         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
180         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
181
182         inOrder.verifyNoMoreInteractions();
183     }
184
185     @Test
186     public void testIncrementalRecovery() {
187         int recoverySnapshotInterval = 3;
188         int numberOfEntries = 5;
189         configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
190         Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
191         context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
192
193         ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
194         ReplicatedLog replicatedLog = context.getReplicatedLog();
195
196         for (int i = 0; i <= numberOfEntries; i++) {
197             replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
198                 new MockRaftActorContext.MockPayload(String.valueOf(i))));
199         }
200
201         AtomicInteger entryCount = new AtomicInteger();
202         ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
203             int run = entryCount.getAndIncrement();
204             LOG.info("Sending entry number {}", run);
205             sendMessageToSupport(new ApplyJournalEntries(run));
206         }, 0, 1, TimeUnit.SECONDS);
207
208         ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
209             numberOfEntries, TimeUnit.SECONDS);
210         try {
211             canceller.get();
212             verify(mockSnapshotConsumer, times(1)).accept(any());
213             applyEntriesExecutor.shutdown();
214         } catch (InterruptedException | ExecutionException e) {
215             Assert.fail();
216         }
217     }
218
219     @Test
220     public void testOnSnapshotOffer() {
221
222         ReplicatedLog replicatedLog = context.getReplicatedLog();
223         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
224         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
225         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
226
227         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
228                 new MockRaftActorContext.MockPayload("4", 4));
229
230         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
231                 new MockRaftActorContext.MockPayload("5", 5));
232
233         long lastAppliedDuringSnapshotCapture = 3;
234         long lastIndexDuringSnapshotCapture = 5;
235         long electionTerm = 2;
236         String electionVotedFor = "member-2";
237
238         MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1")));
239         Snapshot snapshot = Snapshot.create(snapshotState,
240                 List.of(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
241                 lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
242
243         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
244         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
245
246         sendMessageToSupport(snapshotOffer);
247
248         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
249         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
250         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
251         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
252         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
253         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
254         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
255         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
256         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
257         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
258
259         verify(mockCohort).applyRecoverySnapshot(snapshotState);
260     }
261
262     @Test
263     public void testOnRecoveryCompletedWithRemainingBatch() {
264         ReplicatedLog replicatedLog = context.getReplicatedLog();
265         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
266         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
267
268         sendMessageToSupport(new ApplyJournalEntries(1));
269
270         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
271
272         assertEquals("Last applied", 1, context.getLastApplied());
273         assertEquals("Commit index", 1, context.getCommitIndex());
274
275         InOrder inOrder = Mockito.inOrder(mockCohort);
276         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
277
278         for (int i = 0; i < replicatedLog.size(); i++) {
279             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
280         }
281
282         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
283         inOrder.verify(mockCohort).getRestoreFromSnapshot();
284         inOrder.verifyNoMoreInteractions();
285     }
286
287     @Test
288     public void testOnRecoveryCompletedWithNoRemainingBatch() {
289         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
290
291         verify(mockCohort).getRestoreFromSnapshot();
292         verifyNoMoreInteractions(mockCohort);
293     }
294
295     @Test
296     public void testOnDeleteEntries() {
297         ReplicatedLog replicatedLog = context.getReplicatedLog();
298         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
299         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
300         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
301
302         sendMessageToSupport(new DeleteEntries(1));
303
304         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
305         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
306     }
307
308     @Test
309     public void testUpdateElectionTerm() {
310
311         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
312
313         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
314         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
315     }
316
317     @Test
318     public void testDataRecoveredWithPersistenceDisabled() {
319         doReturn(false).when(mockPersistence).isRecoveryApplicable();
320         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
321
322         Snapshot snapshot = Snapshot.create(new MockSnapshotState(List.of(new MockPayload("1"))),
323                 List.of(), 3, 1, 3, 1, -1, null, null);
324         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
325
326         sendMessageToSupport(snapshotOffer);
327
328         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
329
330         sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
331         sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
332
333         sendMessageToSupport(new ApplyJournalEntries(4));
334
335         sendMessageToSupport(new DeleteEntries(5));
336
337         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
338         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
339         assertEquals("Last applied", -1, context.getLastApplied());
340         assertEquals("Commit index", -1, context.getCommitIndex());
341         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
342         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
343
344         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
345         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
346
347         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
348
349         verify(mockCohort, never()).applyRecoverySnapshot(any());
350         verify(mockCohort, never()).getRestoreFromSnapshot();
351         verifyNoMoreInteractions(mockCohort);
352
353         verify(mockPersistentProvider).deleteMessages(10L);
354     }
355
356     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
357         return ArgumentMatchers.argThat(other ->
358                 term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor()));
359     }
360
361     @Test
362     public void testNoDataRecoveredWithPersistenceDisabled() {
363         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
364
365         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
366         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
367
368         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
369
370         verify(mockCohort).getRestoreFromSnapshot();
371         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
372     }
373
374     @Test
375     public void testServerConfigurationPayloadApplied() {
376         String follower1 = "follower1";
377         String follower2 = "follower2";
378         String follower3 = "follower3";
379
380         context.addToPeers(follower1, null, VotingState.VOTING);
381         context.addToPeers(follower2, null, VotingState.VOTING);
382
383         //add new Server
384         ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of(
385                 new ServerInfo(localId, true),
386                 new ServerInfo(follower1, true),
387                 new ServerInfo(follower2, false),
388                 new ServerInfo(follower3, true)));
389
390         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
391
392         //verify new peers
393         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
394         assertEquals("New peer Ids", Set.of(follower1, follower2, follower3), Set.copyOf(context.getPeerIds()));
395         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
396         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
397         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
398
399         sendMessageToSupport(new ApplyJournalEntries(0));
400
401         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
402         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
403
404         //remove existing follower1
405         obj = new ServerConfigurationPayload(List.of(
406                 new ServerInfo(localId, true),
407                 new ServerInfo("follower2", true),
408                 new ServerInfo("follower3", true)));
409
410         sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
411
412         //verify new peers
413         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
414         assertEquals("New peer Ids", Set.of(follower2, follower3), Set.copyOf(context.getPeerIds()));
415     }
416
417     @Test
418     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
419         doReturn(false).when(mockPersistence).isRecoveryApplicable();
420
421         String follower = "follower";
422         ServerConfigurationPayload obj = new ServerConfigurationPayload(List.of(
423                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
424
425         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
426
427         //verify new peers
428         assertEquals("New peer Ids", Set.of(follower), Set.copyOf(context.getPeerIds()));
429     }
430
431     @Test
432     public void testOnSnapshotOfferWithServerConfiguration() {
433         long electionTerm = 2;
434         String electionVotedFor = "member-2";
435         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(List.of(
436                 new ServerInfo(localId, true),
437                 new ServerInfo("follower1", true),
438                 new ServerInfo("follower2", true)));
439
440         MockSnapshotState snapshotState = new MockSnapshotState(List.of(new MockPayload("1")));
441         Snapshot snapshot = Snapshot.create(snapshotState, List.of(),
442                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
443
444         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
445         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata, snapshot);
446
447         sendMessageToSupport(snapshotOffer);
448
449         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
450         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
451         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
452         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
453         assertEquals("Peer List", Set.of("follower1", "follower2"), Set.copyOf(context.getPeerIds()));
454     }
455 }