*/
package org.opendaylight.controller.cluster.raft;
-import static akka.pattern.Patterns.ask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.dispatch.Mailboxes;
+import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.OutputStream;
+import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
}
}
+ /**
+ * Message intended for testing to allow triggering persistData via the mailbox.
+ */
+ public static final class TestPersist {
+
+ private final ActorRef actorRef;
+ private final Identifier identifier;
+ private final Payload payload;
+
+ TestPersist(final ActorRef actorRef, final Identifier identifier, final Payload payload) {
+ this.actorRef = actorRef;
+ this.identifier = identifier;
+ this.payload = payload;
+ }
+
+ public ActorRef getActorRef() {
+ return actorRef;
+ }
+
+ public Identifier getIdentifier() {
+ return identifier;
+ }
+
+ public Payload getPayload() {
+ return payload;
+ }
+ }
+
public static class TestRaftActor extends MockRaftActor {
private final ActorRef collectorActor;
TestRaftActor(final Builder builder) {
super(builder);
- this.collectorActor = builder.collectorActor;
+ collectorActor = builder.collectorActor;
}
public void startDropMessages(final Class<?> msgClass) {
@SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" })
@Override
public void handleCommand(final Object message) {
- if (message instanceof MockPayload) {
- MockPayload payload = (MockPayload) message;
+ if (message instanceof MockPayload payload) {
super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
return;
}
- if (message instanceof ServerConfigurationPayload) {
- super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message, false);
+ if (message instanceof ServerConfigurationPayload payload) {
+ super.persistData(collectorActor, new MockIdentifier("serverConfig"), payload, false);
+ return;
+ }
+
+ if (message instanceof SetPeerAddress setPeerAddress) {
+ setPeerAddress(setPeerAddress.getPeerId(), setPeerAddress.getPeerAddress());
return;
}
- if (message instanceof SetPeerAddress) {
- setPeerAddress(((SetPeerAddress) message).getPeerId(),
- ((SetPeerAddress) message).getPeerAddress());
+ if (message instanceof TestPersist testPersist) {
+ persistData(testPersist.getActorRef(), testPersist.getIdentifier(), testPersist.getPayload(), false);
return;
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
- MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
+ MockSnapshotState snapshotState = new MockSnapshotState(List.copyOf(getState()));
if (installSnapshotStream.isPresent()) {
- SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
+ SerializationUtils.serialize(snapshotState, installSnapshotStream.orElseThrow());
}
actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
}
public Builder collectorActor(final ActorRef newCollectorActor) {
- this.collectorActor = newCollectorActor;
+ collectorActor = newCollectorActor;
return this;
}
}
}
- protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+ // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
+ protected static final int MAXIMUM_MESSAGE_SLICE_SIZE = 700;
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected String follower2Id = factory.generateActorId("follower");
protected TestActorRef<TestRaftActor> follower2Actor;
protected ActorRef follower2CollectorActor;
- protected RaftActorBehavior follower2;
+ protected RaftActorBehavior follower2;
protected RaftActorContext follower2Context;
- protected ImmutableMap<String, String> peerAddresses;
+ protected Map<String, String> peerAddresses;
protected long initialTerm = 5;
protected long currentTerm;
protected int snapshotBatchCount = 4;
- protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
+ protected int maximumMessageSliceSize = MAXIMUM_MESSAGE_SLICE_SIZE;
protected List<MockPayload> expSnapshotState = new ArrayList<>();
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- configParams.setSnapshotChunkSize(snapshotChunkSize);
+ configParams.setMaximumMessageSliceSize(maximumMessageSliceSize);
return configParams;
}
protected TestActorRef<TestRaftActor> newTestRaftActor(final String id, final Map<String, String> newPeerAddresses,
final ConfigParams configParams) {
return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(newPeerAddresses != null
- ? newPeerAddresses : Collections.<String, String>emptyMap()).config(configParams));
+ ? newPeerAddresses : Map.of()).config(configParams));
}
protected TestActorRef<TestRaftActor> newTestRaftActor(final String id, final TestRaftActor.Builder builder) {
testkit.watch(actor);
actor.tell(PoisonPill.getInstance(), null);
- testkit.expectMsgClass(testkit.duration("5 seconds"), Terminated.class);
+ testkit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
testkit.unwatch(actor);
}
}
protected void verifySnapshot(final String prefix, final Snapshot snapshot, final long lastAppliedTerm,
- final long lastAppliedIndex, final long lastTerm, final long lastIndex)
- throws Exception {
+ final long lastAppliedIndex, final long lastTerm, final long lastIndex) {
assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
try {
- OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+ OnDemandRaftState raftState = (OnDemandRaftState)Await.result(Patterns.ask(raftActor,
GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
verifier.accept(raftState);
return;