*/
package org.opendaylight.controller.cluster.raft;
-import static akka.pattern.Patterns.ask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.dispatch.Mailboxes;
+import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
TestRaftActor(final Builder builder) {
super(builder);
- this.collectorActor = builder.collectorActor;
+ collectorActor = builder.collectorActor;
}
public void startDropMessages(final Class<?> msgClass) {
@SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" })
@Override
public void handleCommand(final Object message) {
- if (message instanceof MockPayload) {
- MockPayload payload = (MockPayload) message;
+ if (message instanceof MockPayload payload) {
super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
return;
}
- if (message instanceof ServerConfigurationPayload) {
- super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message, false);
+ if (message instanceof ServerConfigurationPayload payload) {
+ super.persistData(collectorActor, new MockIdentifier("serverConfig"), payload, false);
return;
}
- if (message instanceof SetPeerAddress) {
- setPeerAddress(((SetPeerAddress) message).getPeerId(),
- ((SetPeerAddress) message).getPeerAddress());
+ if (message instanceof SetPeerAddress setPeerAddress) {
+ setPeerAddress(setPeerAddress.getPeerId(), setPeerAddress.getPeerAddress());
return;
}
- if (message instanceof TestPersist) {
- persistData(((TestPersist) message).getActorRef(), ((TestPersist) message).getIdentifier(),
- ((TestPersist) message).getPayload(), false);
+ if (message instanceof TestPersist testPersist) {
+ persistData(testPersist.getActorRef(), testPersist.getIdentifier(), testPersist.getPayload(), false);
return;
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
- MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
+ MockSnapshotState snapshotState = new MockSnapshotState(List.copyOf(getState()));
if (installSnapshotStream.isPresent()) {
- SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
+ SerializationUtils.serialize(snapshotState, installSnapshotStream.orElseThrow());
}
actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
}
public Builder collectorActor(final ActorRef newCollectorActor) {
- this.collectorActor = newCollectorActor;
+ collectorActor = newCollectorActor;
return this;
}
}
}
- protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+ // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
+ protected static final int MAXIMUM_MESSAGE_SLICE_SIZE = 700;
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected String follower2Id = factory.generateActorId("follower");
protected TestActorRef<TestRaftActor> follower2Actor;
protected ActorRef follower2CollectorActor;
- protected RaftActorBehavior follower2;
+ protected RaftActorBehavior follower2;
protected RaftActorContext follower2Context;
- protected ImmutableMap<String, String> peerAddresses;
+ protected Map<String, String> peerAddresses;
protected long initialTerm = 5;
protected long currentTerm;
protected int snapshotBatchCount = 4;
- protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
+ protected int maximumMessageSliceSize = MAXIMUM_MESSAGE_SLICE_SIZE;
protected List<MockPayload> expSnapshotState = new ArrayList<>();
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- configParams.setSnapshotChunkSize(snapshotChunkSize);
+ configParams.setMaximumMessageSliceSize(maximumMessageSliceSize);
return configParams;
}
protected TestActorRef<TestRaftActor> newTestRaftActor(final String id, final Map<String, String> newPeerAddresses,
final ConfigParams configParams) {
return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(newPeerAddresses != null
- ? newPeerAddresses : Collections.<String, String>emptyMap()).config(configParams));
+ ? newPeerAddresses : Map.of()).config(configParams));
}
protected TestActorRef<TestRaftActor> newTestRaftActor(final String id, final TestRaftActor.Builder builder) {
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
try {
- OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+ OnDemandRaftState raftState = (OnDemandRaftState)Await.result(Patterns.ask(raftActor,
GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
verifier.accept(raftState);
return;