Fix intermittent failures in FollowerTest
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehaviorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNull;
14
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.testkit.TestActorRef;
18 import com.google.protobuf.ByteString;
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.io.ObjectOutputStream;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import org.junit.After;
26 import org.junit.Assert;
27 import org.junit.Test;
28 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
29 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
30 import org.opendaylight.controller.cluster.raft.RaftActorContext;
31 import org.opendaylight.controller.cluster.raft.RaftState;
32 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
33 import org.opendaylight.controller.cluster.raft.TestActorFactory;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
35 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
36 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
37 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
41 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
42 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
43 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
44 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
45 import org.slf4j.LoggerFactory;
46
47 public abstract class AbstractRaftActorBehaviorTest<T extends RaftActorBehavior> extends AbstractActorTest {
48
49     protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
50
51     private final TestActorRef<MessageCollectorActor> behaviorActor = actorFactory.createTestActor(
52             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("behavior"));
53
54     RaftActorBehavior behavior;
55
56     @After
57     public void tearDown() throws Exception {
58         if (behavior != null) {
59             behavior.close();
60         }
61
62         actorFactory.close();
63
64         InMemoryJournal.clear();
65         InMemorySnapshotStore.clear();
66     }
67
68     /**
69      * This test checks that when a new Raft RPC message is received with a newer
70      * term the RaftActor gets into the Follower state.
71      */
72     @Test
73     public void testHandleRaftRPCWithNewerTerm() throws Exception {
74         MockRaftActorContext actorContext = createActorContext();
75
76         assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
77                 createAppendEntriesWithNewerTerm());
78
79         assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
80                 createAppendEntriesReplyWithNewerTerm());
81
82         assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
83                 createRequestVoteWithNewerTerm());
84
85         assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
86                 createRequestVoteReplyWithNewerTerm());
87     }
88
89
90     /**
91      * This test verifies that when an AppendEntries is received with a term that
92      * is less that the currentTerm of the RaftActor then the RaftActor does not
93      * change it's state and it responds back with a failure.
94      */
95     @Test
96     public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
97         MockRaftActorContext context = createActorContext();
98         short payloadVersion = 5;
99         context.setPayloadVersion(payloadVersion);
100
101         // First set the receivers term to a high number (1000)
102         context.getTermInformation().update(1000, "test");
103
104         AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1, (short)4);
105
106         behavior = createBehavior(context);
107
108         RaftState expected = behavior.state();
109
110         RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
111
112         assertEquals("Raft state", expected, raftBehavior.state());
113
114         // Also expect an AppendEntriesReply to be sent where success is false
115
116         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
117                 behaviorActor, AppendEntriesReply.class);
118
119         assertEquals("isSuccess", false, reply.isSuccess());
120         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
121     }
122
123
124     @Test
125     public void testHandleAppendEntriesAddSameEntryToLog() throws Exception {
126         MockRaftActorContext context = createActorContext();
127
128         context.getTermInformation().update(2, "test");
129
130         // Prepare the receivers log
131         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("zero");
132         setLastLogEntry(context, 2, 0, payload);
133
134         List<ReplicatedLogEntry> entries = new ArrayList<>();
135         entries.add(new SimpleReplicatedLogEntry(0, 2, payload));
136
137         final AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
138
139         behavior = createBehavior(context);
140
141         assertFalse("This test should be overridden when testing Candidate", behavior instanceof Candidate);
142
143         RaftState expected = behavior.state();
144
145         // Check that the behavior does not handle unknwon message
146         assertNull(behavior.handleMessage(behaviorActor, "unknown"));
147
148         RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
149
150         assertEquals("Raft state", expected, raftBehavior.state());
151
152         assertEquals("ReplicatedLog size", 1, context.getReplicatedLog().size());
153
154         handleAppendEntriesAddSameEntryToLogReply(behaviorActor);
155     }
156
157     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
158             throws Exception {
159         AppendEntriesReply reply = MessageCollectorActor.getFirstMatching(replyActor, AppendEntriesReply.class);
160         Assert.assertNull("Expected no AppendEntriesReply", reply);
161     }
162
163     /**
164      * This test verifies that when a RequestVote is received by the RaftActor
165      * with the senders' log is more up to date than the receiver that the receiver grants
166      * the vote to the sender.
167      */
168     @Test
169     public void testHandleRequestVoteWhenSenderLogMoreUpToDate() {
170         MockRaftActorContext context = createActorContext();
171
172         behavior = createBehavior(context);
173
174         context.getTermInformation().update(1, "test");
175
176         behavior.handleMessage(behaviorActor, new RequestVote(context.getTermInformation().getCurrentTerm(),
177                 "test", 10000, 999));
178
179         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
180                 RequestVoteReply.class);
181         assertEquals("isVoteGranted", true, reply.isVoteGranted());
182     }
183
184     /**
185      * This test verifies that when a RaftActor receives a RequestVote message
186      * with a term that is greater than it's currentTerm but a less up-to-date
187      * log then the receiving RaftActor will not grant the vote to the sender.
188      */
189     @Test
190     public void testHandleRequestVoteWhenSenderLogLessUptoDate() {
191         MockRaftActorContext context = createActorContext();
192
193         behavior = createBehavior(context);
194
195         context.getTermInformation().update(1, "test");
196
197         int index = 2000;
198         setLastLogEntry(context, context.getTermInformation().getCurrentTerm(), index,
199                 new MockRaftActorContext.MockPayload(""));
200
201         behavior.handleMessage(behaviorActor, new RequestVote(
202                 context.getTermInformation().getCurrentTerm(), "test",
203                 index - 1, context.getTermInformation().getCurrentTerm()));
204
205         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
206                 RequestVoteReply.class);
207         assertEquals("isVoteGranted", false, reply.isVoteGranted());
208     }
209
210
211
212     /**
213      * This test verifies that the receiving RaftActor will not grant a vote
214      * to a sender if the sender's term is lesser than the currentTerm of the
215      * recipient RaftActor.
216      */
217     @Test
218     public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
219         MockRaftActorContext context = createActorContext();
220
221         context.getTermInformation().update(1000, null);
222
223         behavior = createBehavior(context);
224
225         behavior.handleMessage(behaviorActor, new RequestVote(999, "test", 10000, 999));
226
227         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
228                 RequestVoteReply.class);
229         assertEquals("isVoteGranted", false, reply.isVoteGranted());
230     }
231
232     @Test
233     public void testPerformSnapshot() {
234         MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
235         AbstractRaftActorBehavior abstractBehavior =  (AbstractRaftActorBehavior) createBehavior(context);
236         if (abstractBehavior instanceof Candidate) {
237             return;
238         }
239
240         context.getTermInformation().update(1, "test");
241
242         //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
243         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
244         context.setLastApplied(0);
245         abstractBehavior.performSnapshotWithoutCapture(0);
246         assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
247         assertEquals(1, context.getReplicatedLog().size());
248
249         //2 entries, lastApplied still 0, no purging.
250         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
251         context.setLastApplied(0);
252         abstractBehavior.performSnapshotWithoutCapture(0);
253         assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
254         assertEquals(2, context.getReplicatedLog().size());
255
256         //2 entries, lastApplied still 0, no purging.
257         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
258         context.setLastApplied(1);
259         abstractBehavior.performSnapshotWithoutCapture(0);
260         assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
261         assertEquals(1, context.getReplicatedLog().size());
262
263         // 5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and
264         // 1 will only get purged
265         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
266         context.setLastApplied(2);
267         abstractBehavior.performSnapshotWithoutCapture(3);
268         assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
269         assertEquals(3, context.getReplicatedLog().size());
270
271         // scenario where Last applied > Replicated to all index (becoz of a slow follower)
272         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
273         context.setLastApplied(2);
274         abstractBehavior.performSnapshotWithoutCapture(1);
275         assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
276         assertEquals(1, context.getReplicatedLog().size());
277     }
278
279
280     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
281             ActorRef actorRef, RaftRPC rpc) throws Exception {
282
283         Payload payload = new MockRaftActorContext.MockPayload("");
284         setLastLogEntry(actorContext, 1, 0, payload);
285         actorContext.getTermInformation().update(1, "test");
286
287         RaftActorBehavior origBehavior = createBehavior(actorContext);
288         RaftActorBehavior raftBehavior = origBehavior.handleMessage(actorRef, rpc);
289
290         assertEquals("New raft state", RaftState.Follower, raftBehavior.state());
291         assertEquals("New election term", rpc.getTerm(), actorContext.getTermInformation().getCurrentTerm());
292
293         origBehavior.close();
294         raftBehavior.close();
295     }
296
297     protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
298         MockRaftActorContext actorContext, long term, long index, Payload data) {
299         return setLastLogEntry(actorContext, new SimpleReplicatedLogEntry(index, term, data));
300     }
301
302     protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(MockRaftActorContext actorContext,
303             ReplicatedLogEntry logEntry) {
304         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
305         log.append(logEntry);
306         actorContext.setReplicatedLog(log);
307
308         return log;
309     }
310
311     protected abstract T createBehavior(RaftActorContext actorContext);
312
313     protected final T createBehavior(MockRaftActorContext actorContext) {
314         T ret = createBehavior((RaftActorContext)actorContext);
315         actorContext.setCurrentBehavior(ret);
316         return ret;
317     }
318
319     protected RaftActorBehavior createBehavior() {
320         return createBehavior(createActorContext());
321     }
322
323     protected MockRaftActorContext createActorContext() {
324         return new MockRaftActorContext();
325     }
326
327     protected MockRaftActorContext createActorContext(ActorRef actor) {
328         return new MockRaftActorContext("test", getSystem(), actor);
329     }
330
331     protected AppendEntries createAppendEntriesWithNewerTerm() {
332         return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1, (short)0);
333     }
334
335     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
336         return new AppendEntriesReply("follower-1", 100, false, 100, 100, (short)0);
337     }
338
339     protected RequestVote createRequestVoteWithNewerTerm() {
340         return new RequestVote(100, "candidate-1", 10, 100);
341     }
342
343     protected RequestVoteReply createRequestVoteReplyWithNewerTerm() {
344         return new RequestVoteReply(100, false);
345     }
346
347     protected ByteString toByteString(Map<String, String> state) {
348         ByteArrayOutputStream bos = new ByteArrayOutputStream();
349         try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
350             oos.writeObject(state);
351             return ByteString.copyFrom(bos.toByteArray());
352         } catch (IOException e) {
353             throw new AssertionError("IOException occurred converting Map to Bytestring", e);
354         }
355     }
356
357     protected void logStart(String name) {
358         LoggerFactory.getLogger(getClass()).info("Starting " + name);
359     }
360
361     protected RaftPolicy createRaftPolicy(final boolean automaticElectionsEnabled,
362                                           final boolean applyModificationToStateBeforeConsensus) {
363         return new RaftPolicy() {
364             @Override
365             public boolean automaticElectionsEnabled() {
366                 return automaticElectionsEnabled;
367             }
368
369             @Override
370             public boolean applyModificationToStateBeforeConsensus() {
371                 return applyModificationToStateBeforeConsensus;
372             }
373         };
374     }
375 }