BUG-5626: remove ApplyLogEntries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupportTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.verifyNoMoreInteractions;
19 import akka.japi.Procedure;
20 import akka.persistence.RecoveryCompleted;
21 import akka.persistence.SnapshotMetadata;
22 import akka.persistence.SnapshotOffer;
23 import akka.persistence.SnapshotSelectionCriteria;
24 import com.google.common.collect.Sets;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import org.hamcrest.Description;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentMatcher;
31 import org.mockito.InOrder;
32 import org.mockito.Matchers;
33 import org.mockito.Mock;
34 import org.mockito.Mockito;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.cluster.DataPersistenceProvider;
37 import org.opendaylight.controller.cluster.PersistentDataProvider;
38 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
42 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Unit tests for RaftActorRecoverySupport.
48  *
49  * @author Thomas Pantelis
50  */
51 public class RaftActorRecoverySupportTest {
52
53     private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
54
55     @Mock
56     private DataPersistenceProvider mockPersistence;
57
58
59     @Mock
60     private RaftActorRecoveryCohort mockCohort;
61
62     @Mock
63     PersistentDataProvider mockPersistentProvider;
64
65     private RaftActorRecoverySupport support;
66
67     private RaftActorContext context;
68     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
69     private final String localId = "leader";
70
71
72     @Before
73     public void setup() {
74         MockitoAnnotations.initMocks(this);
75
76         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
77                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
78
79         support = new RaftActorRecoverySupport(context, mockCohort);
80
81         doReturn(true).when(mockPersistence).isRecoveryApplicable();
82
83         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
84     }
85
86     private void sendMessageToSupport(Object message) {
87         sendMessageToSupport(message, false);
88     }
89
90     private void sendMessageToSupport(Object message, boolean expComplete) {
91         boolean complete = support.handleRecoveryMessage(message, mockPersistentProvider);
92         assertEquals("complete", expComplete, complete);
93     }
94
95     @Test
96     public void testOnReplicatedLogEntry() {
97         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
98                 1, new MockRaftActorContext.MockPayload("1", 5));
99
100         sendMessageToSupport(logEntry);
101
102         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
103         assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
104         assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
105         assertEquals("Last applied", -1, context.getLastApplied());
106         assertEquals("Commit index", -1, context.getCommitIndex());
107         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
108         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
109     }
110
111     @Test
112     public void testOnApplyJournalEntries() {
113         configParams.setJournalRecoveryLogBatchSize(5);
114
115         ReplicatedLog replicatedLog = context.getReplicatedLog();
116         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
117                 0, new MockRaftActorContext.MockPayload("0")));
118         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
119                 1, new MockRaftActorContext.MockPayload("1")));
120         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
121                 2, new MockRaftActorContext.MockPayload("2")));
122         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
123                 3, new MockRaftActorContext.MockPayload("3")));
124         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
125                 4, new MockRaftActorContext.MockPayload("4")));
126         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
127                 5, new MockRaftActorContext.MockPayload("5")));
128
129         sendMessageToSupport(new ApplyJournalEntries(2));
130
131         assertEquals("Last applied", 2, context.getLastApplied());
132         assertEquals("Commit index", 2, context.getCommitIndex());
133
134         sendMessageToSupport(new ApplyJournalEntries(4));
135
136         assertEquals("Last applied", 4, context.getLastApplied());
137         assertEquals("Last applied", 4, context.getLastApplied());
138
139         sendMessageToSupport(new ApplyJournalEntries(5));
140
141         assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
142         assertEquals("Last applied", 5, context.getLastApplied());
143         assertEquals("Commit index", 5, context.getCommitIndex());
144         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
145         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
146
147         InOrder inOrder = Mockito.inOrder(mockCohort);
148         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
149
150         for(int i = 0; i < replicatedLog.size() - 1; i++) {
151             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
152         }
153
154         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
155         inOrder.verify(mockCohort).startLogRecoveryBatch(5);
156         inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
157
158         inOrder.verifyNoMoreInteractions();
159     }
160
161     @Test
162     public void testOnSnapshotOffer() {
163
164         ReplicatedLog replicatedLog = context.getReplicatedLog();
165         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
166                 1, new MockRaftActorContext.MockPayload("1")));
167         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
168                 2, new MockRaftActorContext.MockPayload("2")));
169         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
170                 3, new MockRaftActorContext.MockPayload("3")));
171
172         byte[] snapshotBytes = {1,2,3,4,5};
173
174         ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
175                 4, new MockRaftActorContext.MockPayload("4", 4));
176
177         ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
178                 5, new MockRaftActorContext.MockPayload("5", 5));
179
180         long lastAppliedDuringSnapshotCapture = 3;
181         long lastIndexDuringSnapshotCapture = 5;
182         long electionTerm = 2;
183         String electionVotedFor = "member-2";
184
185         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
186                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
187
188         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
189         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
190
191         sendMessageToSupport(snapshotOffer);
192
193         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
194         assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
195         assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
196         assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
197         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
198         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
199         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
200         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
201         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
202         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
203
204         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
205     }
206
207     @Test
208     public void testOnRecoveryCompletedWithRemainingBatch() {
209         ReplicatedLog replicatedLog = context.getReplicatedLog();
210         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
211                 0, new MockRaftActorContext.MockPayload("0")));
212         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
213                 1, new MockRaftActorContext.MockPayload("1")));
214
215         sendMessageToSupport(new ApplyJournalEntries(1));
216
217         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
218
219         assertEquals("Last applied", 1, context.getLastApplied());
220         assertEquals("Commit index", 1, context.getCommitIndex());
221
222         InOrder inOrder = Mockito.inOrder(mockCohort);
223         inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
224
225         for(int i = 0; i < replicatedLog.size(); i++) {
226             inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
227         }
228
229         inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
230         inOrder.verify(mockCohort).getRestoreFromSnapshot();
231         inOrder.verifyNoMoreInteractions();
232     }
233
234     @Test
235     public void testOnRecoveryCompletedWithNoRemainingBatch() {
236         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
237
238         verify(mockCohort).getRestoreFromSnapshot();
239         verifyNoMoreInteractions(mockCohort);
240     }
241
242     @Test
243     public void testOnDeprecatedDeleteEntries() {
244         ReplicatedLog replicatedLog = context.getReplicatedLog();
245         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
246                 0, new MockRaftActorContext.MockPayload("0")));
247         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
248                 1, new MockRaftActorContext.MockPayload("1")));
249         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
250                 2, new MockRaftActorContext.MockPayload("2")));
251
252         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
253
254         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
255         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
256     }
257
258     @Test
259     public void testOnDeleteEntries() {
260         ReplicatedLog replicatedLog = context.getReplicatedLog();
261         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
262                 0, new MockRaftActorContext.MockPayload("0")));
263         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
264                 1, new MockRaftActorContext.MockPayload("1")));
265         replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
266                 2, new MockRaftActorContext.MockPayload("2")));
267
268         sendMessageToSupport(new DeleteEntries(1));
269
270         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
271         assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
272     }
273
274     @Test
275     public void testUpdateElectionTerm() {
276
277         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
278
279         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
280         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
281     }
282
283     @Test
284     public void testDeprecatedUpdateElectionTerm() {
285
286         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(5, "member2"));
287
288         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
289         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
290     }
291
292     @SuppressWarnings("unchecked")
293     @Test
294     public void testDataRecoveredWithPersistenceDisabled() {
295         doReturn(false).when(mockPersistence).isRecoveryApplicable();
296         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
297
298         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
299
300         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
301         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
302
303         sendMessageToSupport(snapshotOffer);
304
305         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
306                 4, new MockRaftActorContext.MockPayload("4")));
307         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
308                 5, new MockRaftActorContext.MockPayload("5")));
309
310         sendMessageToSupport(new ApplyJournalEntries(4));
311
312         sendMessageToSupport(new DeleteEntries(5));
313
314         sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
315
316         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
317         assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
318         assertEquals("Last applied", -1, context.getLastApplied());
319         assertEquals("Commit index", -1, context.getCommitIndex());
320         assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
321         assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
322
323         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
324         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
325
326         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
327
328         verify(mockCohort).getRestoreFromSnapshot();
329         verifyNoMoreInteractions(mockCohort);
330
331         verify(mockPersistentProvider).deleteMessages(10L);
332         verify(mockPersistentProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
333         verify(mockPersistentProvider).persist(updateElectionTerm(5, "member2"), any(Procedure.class));
334     }
335
336     static UpdateElectionTerm updateElectionTerm(final long term, final String votedFor) {
337         return Matchers.argThat(new ArgumentMatcher<UpdateElectionTerm>() {
338             @Override
339             public boolean matches(Object argument) {
340                 UpdateElectionTerm other = (UpdateElectionTerm) argument;
341                 return term == other.getCurrentTerm() && votedFor.equals(other.getVotedFor());
342             }
343
344             @Override
345             public void describeTo(Description description) {
346                 description.appendValue(new UpdateElectionTerm(term, votedFor));
347             }
348         });
349     }
350
351     @Test
352     public void testNoDataRecoveredWithPersistenceDisabled() {
353         doReturn(false).when(mockPersistence).isRecoveryApplicable();
354
355         sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
356
357         assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
358         assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
359
360         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
361
362         verify(mockCohort).getRestoreFromSnapshot();
363         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
364     }
365
366     @Test
367     public void testServerConfigurationPayloadApplied() {
368         String follower1 = "follower1";
369         String follower2 = "follower2";
370         String follower3 = "follower3";
371
372         context.addToPeers(follower1, null, VotingState.VOTING);
373         context.addToPeers(follower2, null, VotingState.VOTING);
374
375         //add new Server
376         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
377                 new ServerInfo(localId, true),
378                 new ServerInfo(follower1, true),
379                 new ServerInfo(follower2, false),
380                 new ServerInfo(follower3, true)));
381
382         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
383
384         //verify new peers
385         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
386         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
387                 Sets.newHashSet(context.getPeerIds()));
388         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
389         assertEquals("follower2 isVoting", false, context.getPeerInfo(follower2).isVoting());
390         assertEquals("follower3 isVoting", true, context.getPeerInfo(follower3).isVoting());
391
392         sendMessageToSupport(new ApplyJournalEntries(0));
393
394         verify(mockCohort, never()).startLogRecoveryBatch(anyInt());
395         verify(mockCohort, never()).appendRecoveredLogEntry(any(Payload.class));
396
397         //remove existing follower1
398         obj = new ServerConfigurationPayload(Arrays.asList(
399                 new ServerInfo(localId, true),
400                 new ServerInfo("follower2", true),
401                 new ServerInfo("follower3", true)));
402
403         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
404
405         //verify new peers
406         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
407         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
408     }
409
410     @Test
411     public void testServerConfigurationPayloadAppliedWithPersistenceDisabled() {
412         doReturn(false).when(mockPersistence).isRecoveryApplicable();
413
414         String follower = "follower";
415         ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
416                 new ServerInfo(localId, true), new ServerInfo(follower, true)));
417
418         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
419
420         //verify new peers
421         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
422     }
423
424     @Test
425     public void testOnSnapshotOfferWithServerConfiguration() {
426         long electionTerm = 2;
427         String electionVotedFor = "member-2";
428         ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
429                                                         new ServerInfo(localId, true),
430                                                         new ServerInfo("follower1", true),
431                                                         new ServerInfo("follower2", true)));
432
433         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
434                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
435
436         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
437         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
438
439         sendMessageToSupport(snapshotOffer);
440
441         assertEquals("Journal log size", 0, context.getReplicatedLog().size());
442         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
443         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
444         assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
445         assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
446             Sets.newHashSet(context.getPeerIds()));
447     }
448 }