Bug 2787: Batch AppendEntries to speed up follower sync
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import akka.persistence.SaveSnapshotSuccess;
12 import com.google.common.collect.ImmutableMap;
13 import java.util.Arrays;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
18 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
20 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
21
22 /**
23  * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
24  *
25  * @author Thomas Pantelis
26  */
27 public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
28
29     private MockPayload payload0;
30     private MockPayload payload1;
31
32     @Before
33     public void setup() {
34         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
35                 newFollowerConfigParams());
36
37         peerAddresses = ImmutableMap.<String, String>builder().
38                 put(follower1Id, follower1Actor.path().toString()).build();
39
40         leaderConfigParams = newLeaderConfigParams();
41         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
42
43         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
44         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
45
46         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
47     }
48
49     @Test
50     public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
51
52         send2InitialPayloads();
53
54         // Block these messages initially so we can control the sequence.
55         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
56         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
57
58         MockPayload payload2 = sendPayloadData(leaderActor, "two");
59
60         // This should trigger a snapshot.
61         MockPayload payload3 = sendPayloadData(leaderActor, "three");
62
63         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
64
65         // Send another payload.
66         MockPayload payload4 = sendPayloadData(leaderActor, "four");
67
68         // Now deliver the AppendEntries to the follower
69         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
70
71         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
72
73         // Now deliver the CaptureSnapshotReply to the leader.
74         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
75                 leaderCollectorActor, CaptureSnapshotReply.class);
76         leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
77         leaderActor.tell(captureSnapshotReply, leaderActor);
78
79         // Wait for snapshot complete.
80         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
81
82         reinstateLeaderActor();
83
84         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
85         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
86         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
87         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
88         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
89         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
90
91         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
92                 leaderActor.underlyingActor().getState());
93     }
94
95     @Test
96     public void testStatePersistedAfterSnapshotPersisted() {
97
98         send2InitialPayloads();
99
100         // Block these messages initially so we can control the sequence.
101         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
102
103         MockPayload payload2 = sendPayloadData(leaderActor, "two");
104
105         // This should trigger a snapshot.
106         MockPayload payload3 = sendPayloadData(leaderActor, "three");
107
108         // Send another payload.
109         MockPayload payload4 = sendPayloadData(leaderActor, "four");
110
111         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
112
113         // Wait for snapshot complete.
114         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
115
116         // Now deliver the AppendEntries to the follower
117         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
118
119         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
120
121         reinstateLeaderActor();
122
123         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
124         assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
125         assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
126         assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
127         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
128         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
129
130         assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
131                 leaderActor.underlyingActor().getState());
132     }
133
134     private void reinstateLeaderActor() {
135         killActor(leaderActor);
136
137         leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
138
139         leaderActor.underlyingActor().waitForRecoveryComplete();
140
141         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
142     }
143
144     private void send2InitialPayloads() {
145         waitUntilLeader(leaderActor);
146         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
147
148         payload0 = sendPayloadData(leaderActor, "zero");
149
150         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
151
152         payload1 = sendPayloadData(leaderActor, "one");
153
154         // Verify the leader applies the states.
155         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
156
157         assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
158
159         MessageCollectorActor.clearMessages(leaderCollectorActor);
160         MessageCollectorActor.clearMessages(follower1CollectorActor);
161     }
162 }