*/
package org.opendaylight.controller.cluster.raft;
+import static akka.pattern.Patterns.ask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
import org.junit.After;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.util.AbstractStringIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
/**
public static class TestRaftActor extends MockRaftActor {
private final TestActorRef<MessageCollectorActor> collectorActor;
- private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
+ private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
private TestRaftActor(Builder builder) {
super(builder);
}
public void startDropMessages(Class<?> msgClass) {
- dropMessages.put(msgClass, Boolean.TRUE);
+ dropMessages.put(msgClass, msg -> true);
+ }
+
+ <T> void startDropMessages(Class<T> msgClass, Predicate<T> filter) {
+ dropMessages.put(msgClass, filter);
}
public void stopDropMessages(Class<?> msgClass) {
getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void handleCommand(Object message) {
if(message instanceof MockPayload) {
}
try {
- if(!dropMessages.containsKey(message.getClass())) {
+ Predicate drop = dropMessages.get(message.getClass());
+ if(drop == null || !drop.test(message)) {
super.handleCommand(message);
}
} finally {
protected DefaultConfigParamsImpl newLeaderConfigParams() {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
- configParams.setElectionTimeoutFactor(1);
+ configParams.setElectionTimeoutFactor(4);
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
actor.getCurrentBehavior().getReplicatedToAllIndex());
}
+
+ static void verifyRaftState(ActorRef raftActor, Consumer<OnDemandRaftState> verifier) {
+ Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ try {
+ OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+ GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+ verifier.accept(raftState);
+ return;
+ } catch (AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ lastError = new AssertionError("OnDemandRaftState failed", e);
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
+ }
}