Bug 7362: Notify applyState synchronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.AdditionalMatchers.aryEq;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyInt;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import com.google.common.collect.Sets;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import org.hamcrest.Description;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.ArgumentMatcher;
32 import org.mockito.InOrder;
33 import org.mockito.Matchers;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.PersistentDataProvider;
39 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
41 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
42 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
43 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
45 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Unit tests for RaftActorRecoverySupport.
51  *
52  * @author Thomas Pantelis
53  */
54 public class RaftActorRecoverySupportTest {
55
56     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
57
58     @Mock
59     private DataPersistenceProvider mockPersistence;
60
61
62     @Mock
63     private RaftActorRecoveryCohort mockCohort;
64
65     @Mock
66     PersistentDataProvider mockPersistentProvider;
67
68     private RaftActorRecoverySupport support;
69
70     private RaftActorContext context;
71     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
72     private final String localId = "leader";
73
74
75     @Before
76     public void setup() {
77         MockitoAnnotations.initMocks(this);
78
79         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
80                 LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
81                 mockPersistence, applyState -> { }, LOG);
82
83         support = new RaftActorRecoverySupport(context, mockCohort);
84
85         doReturn(true).when(mockPersistence).isRecoveryApplicable();
86
87         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
88     }
89
90     private void sendMessageToSupport(Object message) {
91         sendMessageToSupport(message, false);
92     }
93
94     private void sendMessageToSupport(Object message, boolean expComplete) {
95         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
96         assertEquals("complete", expComplete, complete);
97     }
98
99     @Test
100     public void testOnReplicatedLogEntry() {
101         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5));
102
103         sendMessageToSupport(logEntry);
104
105         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
106         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
107         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
108         assertEquals("Last applied", -1, context.getLastApplied());
109         assertEquals("Commit index", -1, context.getCommitIndex());
110         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
111         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
112     }
113
114     @Test
115     public void testOnApplyJournalEntries() {
116         configParams.setJournalRecoveryLogBatchSize(5);
117
118         ReplicatedLog replicatedLog = context.getReplicatedLog();
119         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
120         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
121         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
122         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
123         replicatedLog.append(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
124         replicatedLog.append(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
125
126         sendMessageToSupport(new ApplyJournalEntries(2));
127
128         assertEquals("Last applied", 2, context.getLastApplied());
129         assertEquals("Commit index", 2, context.getCommitIndex());
130
131         sendMessageToSupport(new ApplyJournalEntries(4));
132
133         assertEquals("Last applied", 4, context.getLastApplied());
134         assertEquals("Last applied", 4, context.getLastApplied());
135
136         sendMessageToSupport(new ApplyJournalEntries(5));
137
138         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
139         assertEquals("Last applied", 5, context.getLastApplied());
140         assertEquals("Commit index", 5, context.getCommitIndex());
141         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
142         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
143
144         InOrder inOrder = Mockito.inOrder(mockCohort);
145         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
146
147         for (int i = 0; i < replicatedLog.size() - 1; i++) {
148             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
149         }
150
151         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
152         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
153         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
154
155         inOrder.verifyNoMoreInteractions();
156     }
157
158     @Test
159     public void testOnSnapshotOffer() {
160
161         ReplicatedLog replicatedLog = context.getReplicatedLog();
162         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
163         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
164         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
165
166         byte[] snapshotBytes = {1,2,3,4,5};
167
168         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
169                 new MockRaftActorContext.MockPayload("4", 4));
170
171         ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
172                 new MockRaftActorContext.MockPayload("5", 5));
173
174         long lastAppliedDuringSnapshotCapture = 3;
175         long lastIndexDuringSnapshotCapture = 5;
176         long electionTerm = 2;
177         String electionVotedFor = "member-2";
178
179         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
180                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
181
182         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
183         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
184
185         sendMessageToSupport(snapshotOffer);
186
187         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
188         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
189         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
190         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
191         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
192         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
193         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
194         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
195         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
196         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
197
198         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
199     }
200
201     @Test
202     public void testOnRecoveryCompletedWithRemainingBatch() {
203         ReplicatedLog replicatedLog = context.getReplicatedLog();
204         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
205         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
206
207         sendMessageToSupport(new ApplyJournalEntries(1));
208
209         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
210
211         assertEquals("Last applied", 1, context.getLastApplied());
212         assertEquals("Commit index", 1, context.getCommitIndex());
213
214         InOrder inOrder = Mockito.inOrder(mockCohort);
215         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
216
217         for (int i = 0; i < replicatedLog.size(); i++) {
218             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
219         }
220
221         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
222         inOrder.verify(mockCohort).getRestoreFromSnapshot();
223         inOrder.verifyNoMoreInteractions();
224     }
225
226     @Test
227     public void testOnRecoveryCompletedWithNoRemainingBatch() {
228         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
229
230         verify(mockCohort).getRestoreFromSnapshot();
231         verifyNoMoreInteractions(mockCohort);
232     }
233
234     @Test
235     public void testOnDeleteEntries() {
236         ReplicatedLog replicatedLog = context.getReplicatedLog();
237         replicatedLog.append(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("0")));
238         replicatedLog.append(new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")));
239         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
240
241         sendMessageToSupport(new DeleteEntries(1));
242
243         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
244         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
245     }
246
247     @Test
248     public void testUpdateElectionTerm() {
249
250         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
251
252         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
253         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
254     }
255
256     @Test
257     public void testDataRecoveredWithPersistenceDisabled() {
258         doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
259         doReturn(false).when(mockPersistence).isRecoveryApplicable();
260         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
261
262         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
263         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
264
265         sendMessageToSupport(snapshotOffer);
266
267         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
268
269         sendMessageToSupport(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("4")));
270         sendMessageToSupport(new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("5")));
271
272         sendMessageToSupport(new ApplyJournalEntries(4));
273
274         sendMessageToSupport(new DeleteEntries(5));
275
276         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
277         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
278         assertEquals("Last applied", -1, context.getLastApplied());
279         assertEquals("Commit index", -1, context.getCommitIndex());
280         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
281         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
282
283         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
284         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
285
286         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
287
288         verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
289         verify(mockCohort, never()).getRestoreFromSnapshot();
290         verifyNoMoreInteractions(mockCohort);
291
292         verify(mockPersistentProvider).deleteMessages(10L);
293     }
294
295     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
296         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
297             @Override
298             public boolean matches(Object argument) {
299                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
300                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
301             }
302
303             @Override
304             public void describeTo(Description description) {
305                 description.appendValue(new UpdateElectionTerm(term, votedFor));
306             }
307         });
308     }
309
310     @Test
311     public void testNoDataRecoveredWithPersistenceDisabled() {
312         doReturn(false).when(mockPersistence).isRecoveryApplicable();
313
314         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
315
316         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
317         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
318
319         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
320
321         verify(mockCohort).getRestoreFromSnapshot();
322         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
323     }
324
325     @Test
326     public void testServerConfigurationPayloadApplied() {
327         String follower1 = "follower1";
328         String follower2 = "follower2";
329         String follower3 = "follower3";
330
331         context.addToPeers(follower1, null, VotingState.VOTING);
332         context.addToPeers(follower2, null, VotingState.VOTING);
333
334         //add new Server
335         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
336                 new ServerInfo(localId, true),
337                 new ServerInfo(follower1, true),
338                 new ServerInfo(follower2, false),
339                 new ServerInfo(follower3, true)));
340
341         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
342
343         //verify new peers
344         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
345         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
346                 Sets.newHashSet(context.getPeerIds()));
347         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
348         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
349         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
350
351         sendMessageToSupport(new ApplyJournalEntries(0));
352
353         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
354         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
355
356         //remove existing follower1
357         obj = new ServerConfigurationPayload(Arrays.asList(
358                 new ServerInfo(localId, true),
359                 new ServerInfo("follower2", true),
360                 new ServerInfo("follower3", true)));
361
362         sendMessageToSupport(new SimpleReplicatedLogEntry(1, 1, obj));
363
364         //verify new peers
365         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
366         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
367     }
368
369     @Test
370     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
371         doReturn(false).when(mockPersistence).isRecoveryApplicable();
372
373         String follower = "follower";
374         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
375                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
376
377         sendMessageToSupport(new SimpleReplicatedLogEntry(0, 1, obj));
378
379         //verify new peers
380         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
381     }
382
383     @Test
384     public void testOnSnapshotOfferWithServerConfiguration() {
385         long electionTerm = 2;
386         String electionVotedFor = "member-2";
387         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
388                                                         new ServerInfo(localId, true),
389                                                         new ServerInfo("follower1", true),
390                                                         new ServerInfo("follower2", true)));
391
392         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
393                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
394
395         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
396         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
397
398         sendMessageToSupport(snapshotOffer);
399
400         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
401         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
402         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
403         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
404         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
405             Sets.newHashSet(context.getPeerIds()));
406     }
407 }