2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static akka.pattern.Patterns.ask;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
14 import akka.actor.ActorRef;
15 import akka.actor.InvalidActorNameException;
16 import akka.actor.PoisonPill;
17 import akka.actor.Terminated;
18 import akka.dispatch.Dispatchers;
19 import akka.dispatch.Mailboxes;
20 import akka.testkit.TestActorRef;
21 import akka.testkit.javadsl.TestKit;
22 import akka.util.Timeout;
23 import com.google.common.base.Stopwatch;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.io.OutputStream;
27 import java.time.Duration;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.List;
32 import java.util.Optional;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Consumer;
36 import java.util.function.Predicate;
37 import org.apache.commons.lang3.SerializationUtils;
38 import org.junit.After;
39 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
40 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
44 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
45 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
46 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
47 import org.opendaylight.controller.cluster.raft.messages.Payload;
48 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
49 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
50 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
51 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
52 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
53 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
54 import org.opendaylight.yangtools.concepts.Identifier;
55 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.Await;
59 import scala.concurrent.duration.FiniteDuration;
62 * Abstract base for an integration test that tests end-to-end RaftActor and behavior functionality.
64 * @author Thomas Pantelis
66 public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest {
68 private static final class MockIdentifier extends AbstractStringIdentifier<MockIdentifier> {
69 private static final long serialVersionUID = 1L;
71 protected MockIdentifier(final String string) {
76 public static class SetPeerAddress {
77 private final String peerId;
78 private final String peerAddress;
80 public SetPeerAddress(final String peerId, final String peerAddress) {
82 this.peerAddress = peerAddress;
85 public String getPeerId() {
89 public String getPeerAddress() {
95 * Message intended for testing to allow triggering persistData via the mailbox.
97 public static final class TestPersist {
99 private final ActorRef actorRef;
100 private final Identifier identifier;
101 private final Payload payload;
103 TestPersist(final ActorRef actorRef, final Identifier identifier, final Payload payload) {
104 this.actorRef = actorRef;
105 this.identifier = identifier;
106 this.payload = payload;
109 public ActorRef getActorRef() {
113 public Identifier getIdentifier() {
117 public Payload getPayload() {
122 public static class TestRaftActor extends MockRaftActor {
124 private final ActorRef collectorActor;
125 private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
127 TestRaftActor(final Builder builder) {
129 collectorActor = builder.collectorActor;
132 public void startDropMessages(final Class<?> msgClass) {
133 dropMessages.put(msgClass, msg -> true);
136 <T> void startDropMessages(final Class<T> msgClass, final Predicate<T> filter) {
137 dropMessages.put(msgClass, filter);
140 public void stopDropMessages(final Class<?> msgClass) {
141 dropMessages.remove(msgClass);
144 void setMockTotalMemory(final long mockTotalMemory) {
145 getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
148 @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" })
150 public void handleCommand(final Object message) {
151 if (message instanceof MockPayload payload) {
152 super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
156 if (message instanceof ServerConfigurationPayload payload) {
157 super.persistData(collectorActor, new MockIdentifier("serverConfig"), payload, false);
161 if (message instanceof SetPeerAddress setPeerAddress) {
162 setPeerAddress(setPeerAddress.getPeerId(), setPeerAddress.getPeerAddress());
166 if (message instanceof TestPersist testPersist) {
167 persistData(testPersist.getActorRef(), testPersist.getIdentifier(), testPersist.getPayload(), false);
172 Predicate drop = dropMessages.get(message.getClass());
173 if (drop == null || !drop.test(message)) {
174 super.handleCommand(message);
177 if (!(message instanceof SendHeartBeat)) {
179 collectorActor.tell(message, ActorRef.noSender());
180 } catch (Exception e) {
181 LOG.error("MessageCollectorActor error", e);
188 @SuppressWarnings("checkstyle:IllegalCatch")
189 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
190 MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
191 if (installSnapshotStream.isPresent()) {
192 SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
195 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
198 public ActorRef collectorActor() {
199 return collectorActor;
202 public static Builder newBuilder() {
203 return new Builder();
206 public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
207 private ActorRef collectorActor;
210 super(TestRaftActor.class);
213 public Builder collectorActor(final ActorRef newCollectorActor) {
214 collectorActor = newCollectorActor;
220 // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
221 protected static final int SNAPSHOT_CHUNK_SIZE = 700;
223 protected final Logger testLog = LoggerFactory.getLogger(getClass());
225 protected final TestActorFactory factory = new TestActorFactory(getSystem());
227 protected String leaderId = factory.generateActorId("leader");
228 protected DefaultConfigParamsImpl leaderConfigParams;
229 protected TestActorRef<TestRaftActor> leaderActor;
230 protected ActorRef leaderCollectorActor;
231 protected RaftActorContext leaderContext;
232 protected RaftActorBehavior leader;
234 protected String follower1Id = factory.generateActorId("follower");
235 protected TestActorRef<TestRaftActor> follower1Actor;
236 protected ActorRef follower1CollectorActor;
237 protected RaftActorBehavior follower1;
238 protected RaftActorContext follower1Context;
240 protected String follower2Id = factory.generateActorId("follower");
241 protected TestActorRef<TestRaftActor> follower2Actor;
242 protected ActorRef follower2CollectorActor;
243 protected RaftActorBehavior follower2;
244 protected RaftActorContext follower2Context;
246 protected ImmutableMap<String, String> peerAddresses;
248 protected long initialTerm = 5;
249 protected long currentTerm;
251 protected int snapshotBatchCount = 4;
252 protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
254 protected List<MockPayload> expSnapshotState = new ArrayList<>();
257 public void tearDown() {
258 InMemoryJournal.clear();
259 InMemorySnapshotStore.clear();
263 protected DefaultConfigParamsImpl newLeaderConfigParams() {
264 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
265 configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
266 configParams.setElectionTimeoutFactor(4);
267 configParams.setSnapshotBatchCount(snapshotBatchCount);
268 configParams.setSnapshotDataThresholdPercentage(70);
269 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
270 configParams.setSnapshotChunkSize(snapshotChunkSize);
274 protected DefaultConfigParamsImpl newFollowerConfigParams() {
275 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
276 configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
277 configParams.setElectionTimeoutFactor(1000);
281 protected void waitUntilLeader(final ActorRef actorRef) {
282 RaftActorTestKit.waitUntilLeader(actorRef);
285 protected TestActorRef<TestRaftActor> newTestRaftActor(final String id, final Map<String, String> newPeerAddresses,
286 final ConfigParams configParams) {
287 return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(newPeerAddresses != null
288 ? newPeerAddresses : Collections.<String, String>emptyMap()).config(configParams));
291 protected TestActorRef<TestRaftActor> newTestRaftActor(final String id, final TestRaftActor.Builder builder) {
292 builder.collectorActor(factory.createActor(
293 MessageCollectorActor.props(), factory.generateActorId(id + "-collector"))).id(id);
295 InvalidActorNameException lastEx = null;
296 for (int i = 0; i < 10; i++) {
298 return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId())
299 .withMailbox(Mailboxes.DefaultMailboxId()), id);
300 } catch (InvalidActorNameException e) {
302 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
306 assertNotNull(lastEx);
310 protected void killActor(final TestActorRef<TestRaftActor> actor) {
311 TestKit testkit = new TestKit(getSystem());
312 testkit.watch(actor);
314 actor.tell(PoisonPill.getInstance(), null);
315 testkit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
317 testkit.unwatch(actor);
320 protected void verifyApplyJournalEntries(final ActorRef actor, final long expIndex) {
321 MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class,
322 msg -> msg.getToIndex() == expIndex);
325 protected void verifySnapshot(final String prefix, final Snapshot snapshot, final long lastAppliedTerm,
326 final long lastAppliedIndex, final long lastTerm, final long lastIndex) {
327 assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
328 assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
329 assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
330 assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
332 List<Object> actualState = ((MockSnapshotState)snapshot.getState()).getState();
333 assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState,
334 actualState), expSnapshotState.size(), actualState.size());
335 for (int i = 0; i < expSnapshotState.size(); i++) {
336 assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
340 protected void verifyPersistedJournal(final String persistenceId,
341 final List<? extends ReplicatedLogEntry> expJournal) {
342 List<ReplicatedLogEntry> journal = InMemoryJournal.get(persistenceId, ReplicatedLogEntry.class);
343 assertEquals("Journal ReplicatedLogEntry count", expJournal.size(), journal.size());
344 for (int i = 0; i < expJournal.size(); i++) {
345 ReplicatedLogEntry expected = expJournal.get(i);
346 ReplicatedLogEntry actual = journal.get(i);
347 verifyReplicatedLogEntry(expected, actual.getTerm(), actual.getIndex(), actual.getData());
351 protected MockPayload sendPayloadData(final ActorRef actor, final String data) {
352 return sendPayloadData(actor, data, 0);
355 protected MockPayload sendPayloadData(final ActorRef actor, final String data, final int size) {
358 payload = new MockPayload(data, size);
360 payload = new MockPayload(data);
363 actor.tell(payload, ActorRef.noSender());
367 protected void verifyApplyState(final ApplyState applyState, final ActorRef expClientActor,
368 final String expId, final long expTerm, final long expIndex, final Payload payload) {
369 assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor());
371 final Identifier id = expId == null ? null : new MockIdentifier(expId);
372 assertEquals("ApplyState getIdentifier", id, applyState.getIdentifier());
373 ReplicatedLogEntry replicatedLogEntry = applyState.getReplicatedLogEntry();
374 verifyReplicatedLogEntry(replicatedLogEntry, expTerm, expIndex, payload);
377 protected void verifyReplicatedLogEntry(final ReplicatedLogEntry replicatedLogEntry, final long expTerm,
378 final long expIndex, final Payload payload) {
379 assertEquals("ReplicatedLogEntry getTerm", expTerm, replicatedLogEntry.getTerm());
380 assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex());
381 assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData());
384 protected String testActorPath(final String id) {
385 return factory.createTestActorPath(id);
388 protected void verifyLeadersTrimmedLog(final long lastIndex) {
389 verifyTrimmedLog("Leader", leaderActor, lastIndex, lastIndex - 1);
392 protected void verifyLeadersTrimmedLog(final long lastIndex, final long replicatedToAllIndex) {
393 verifyTrimmedLog("Leader", leaderActor, lastIndex, replicatedToAllIndex);
396 protected void verifyFollowersTrimmedLog(final int num, final TestActorRef<TestRaftActor> actorRef,
397 final long lastIndex) {
398 verifyTrimmedLog("Follower " + num, actorRef, lastIndex, lastIndex - 1);
401 protected void verifyTrimmedLog(final String name, final TestActorRef<TestRaftActor> actorRef, final long lastIndex,
402 final long replicatedToAllIndex) {
403 TestRaftActor actor = actorRef.underlyingActor();
404 RaftActorContext context = actor.getRaftActorContext();
405 long snapshotIndex = lastIndex - 1;
406 assertEquals(name + " snapshot term", snapshotIndex < 0 ? -1 : currentTerm,
407 context.getReplicatedLog().getSnapshotTerm());
408 assertEquals(name + " snapshot index", snapshotIndex, context.getReplicatedLog().getSnapshotIndex());
409 assertEquals(name + " journal log size", 1, context.getReplicatedLog().size());
410 assertEquals(name + " journal last index", lastIndex, context.getReplicatedLog().lastIndex());
411 assertEquals(name + " commit index", lastIndex, context.getCommitIndex());
412 assertEquals(name + " last applied", lastIndex, context.getLastApplied());
413 assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
414 actor.getCurrentBehavior().getReplicatedToAllIndex());
417 @SuppressWarnings("checkstyle:IllegalCatch")
418 static void verifyRaftState(final ActorRef raftActor, final Consumer<OnDemandRaftState> verifier) {
419 Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
420 AssertionError lastError = null;
421 Stopwatch sw = Stopwatch.createStarted();
422 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
424 OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
425 GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
426 verifier.accept(raftState);
428 } catch (AssertionError e) {
430 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
431 } catch (Exception e) {
432 lastError = new AssertionError("OnDemandRaftState failed", e);
433 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);