X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FAbstractRaftActorIntegrationTest.java;h=1119a3260a140888b0645040b66aaa8f65f65d99;hb=8049fd4d06da0f4616180e46fbbe95f98cf698ea;hp=343febbcfd534afeba21ab76cfcfdeab66cd394b;hpb=9d5ec5cdd146a56bc03e35b6718e9492a5c8410a;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 343febbcfd..1119a3260a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -7,8 +7,10 @@ */ package org.opendaylight.controller.cluster.raft; +import static akka.pattern.Patterns.ask; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; + import akka.actor.ActorRef; import akka.actor.InvalidActorNameException; import akka.actor.PoisonPill; @@ -16,22 +18,33 @@ import akka.actor.Terminated; import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.util.Timeout; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Predicate; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -40,6 +53,7 @@ import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.util.AbstractStringIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; import scala.concurrent.duration.FiniteDuration; /** @@ -78,7 +92,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest public static class TestRaftActor extends MockRaftActor { private final TestActorRef collectorActor; - private final Map, Boolean> dropMessages = new ConcurrentHashMap<>(); + private final Map, Predicate> dropMessages = new ConcurrentHashMap<>(); private TestRaftActor(Builder builder) { super(builder); @@ -86,7 +100,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } public void startDropMessages(Class msgClass) { - dropMessages.put(msgClass, Boolean.TRUE); + dropMessages.put(msgClass, msg -> true); + } + + void startDropMessages(Class msgClass, Predicate filter) { + dropMessages.put(msgClass, filter); } public void stopDropMessages(Class msgClass) { @@ -97,31 +115,33 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null); } + @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" }) @Override public void handleCommand(Object message) { - if(message instanceof MockPayload) { - MockPayload payload = (MockPayload)message; - super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload); + if (message instanceof MockPayload) { + MockPayload payload = (MockPayload) message; + super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false); return; } - if(message instanceof ServerConfigurationPayload) { - super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload)message); + if (message instanceof ServerConfigurationPayload) { + super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message, false); return; } - if(message instanceof SetPeerAddress) { + if (message instanceof SetPeerAddress) { setPeerAddress(((SetPeerAddress) message).getPeerId().toString(), ((SetPeerAddress) message).getPeerAddress()); return; } try { - if(!dropMessages.containsKey(message.getClass())) { + Predicate drop = dropMessages.get(message.getClass()); + if (drop == null || !drop.test(message)) { super.handleCommand(message); } } finally { - if(!(message instanceof SendHeartBeat)) { + if (!(message instanceof SendHeartBeat)) { try { collectorActor.tell(message, ActorRef.noSender()); } catch (Exception e) { @@ -132,12 +152,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } @Override - public void createSnapshot(ActorRef actorRef) { - try { - actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef); - } catch (Exception e) { - e.printStackTrace(); + @SuppressWarnings("checkstyle:IllegalCatch") + public void createSnapshot(ActorRef actorRef, Optional installSnapshotStream) { + MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState())); + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize(snapshotState, installSnapshotStream.get()); } + + actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef); } public ActorRef collectorActor() { @@ -151,8 +173,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest public static class Builder extends AbstractBuilder { private TestActorRef collectorActor; - public Builder collectorActor(TestActorRef collectorActor) { - this.collectorActor = collectorActor; + public Builder collectorActor(TestActorRef newCollectorActor) { + this.collectorActor = newCollectorActor; return this; } @@ -206,7 +228,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected DefaultConfigParamsImpl newLeaderConfigParams() { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); - configParams.setElectionTimeoutFactor(1); + configParams.setElectionTimeoutFactor(4); configParams.setSnapshotBatchCount(snapshotBatchCount); configParams.setSnapshotDataThresholdPercentage(70); configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -225,10 +247,10 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest RaftActorTestKit.waitUntilLeader(actorRef); } - protected TestActorRef newTestRaftActor(String id, Map peerAddresses, + protected TestActorRef newTestRaftActor(String id, Map newPeerAddresses, ConfigParams configParams) { - return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses : - Collections.emptyMap()).config(configParams)); + return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(newPeerAddresses != null + ? newPeerAddresses : Collections.emptyMap()).config(configParams)); } protected TestActorRef newTestRaftActor(String id, TestRaftActor.Builder builder) { @@ -237,7 +259,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest factory.generateActorId(id + "-collector"))).id(id); InvalidActorNameException lastEx = null; - for(int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { try { return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id); } catch (InvalidActorNameException e) { @@ -250,21 +272,21 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest throw lastEx; } - protected void killActor(TestActorRef leaderActor) { + protected void killActor(TestActorRef actor) { JavaTestKit testkit = new JavaTestKit(getSystem()); - testkit.watch(leaderActor); + testkit.watch(actor); - leaderActor.tell(PoisonPill.getInstance(), null); + actor.tell(PoisonPill.getInstance(), null); testkit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class); - testkit.unwatch(leaderActor); + testkit.unwatch(actor); } protected void verifyApplyJournalEntries(ActorRef actor, final long expIndex) { - MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, msg -> msg.getToIndex() == expIndex); + MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class, + msg -> msg.getToIndex() == expIndex); } - @SuppressWarnings("unchecked") protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm, long lastAppliedIndex, long lastTerm, long lastIndex) throws Exception { @@ -273,10 +295,10 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm()); assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex()); - List actualState = (List)MockRaftActor.toObject(snapshot.getState()); + List actualState = ((MockSnapshotState)snapshot.getState()).getState(); assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState, actualState), expSnapshotState.size(), actualState.size()); - for(int i = 0; i < expSnapshotState.size(); i++) { + for (int i = 0; i < expSnapshotState.size(); i++) { assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i)); } } @@ -284,26 +306,26 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected void verifyPersistedJournal(String persistenceId, List expJournal) { List journal = InMemoryJournal.get(persistenceId, ReplicatedLogEntry.class); assertEquals("Journal ReplicatedLogEntry count", expJournal.size(), journal.size()); - for(int i = 0; i < expJournal.size(); i++) { + for (int i = 0; i < expJournal.size(); i++) { ReplicatedLogEntry expected = expJournal.get(i); ReplicatedLogEntry actual = journal.get(i); verifyReplicatedLogEntry(expected, actual.getTerm(), actual.getIndex(), actual.getData()); } } - protected MockPayload sendPayloadData(ActorRef leaderActor, String data) { - return sendPayloadData(leaderActor, data, 0); + protected MockPayload sendPayloadData(ActorRef actor, String data) { + return sendPayloadData(actor, data, 0); } - protected MockPayload sendPayloadData(ActorRef leaderActor, String data, int size) { + protected MockPayload sendPayloadData(ActorRef actor, String data, int size) { MockPayload payload; - if(size > 0) { + if (size > 0) { payload = new MockPayload(data, size); } else { payload = new MockPayload(data); } - leaderActor.tell(payload, ActorRef.noSender()); + actor.tell(payload, ActorRef.noSender()); return payload; } @@ -324,7 +346,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData()); } - protected String testActorPath(String id){ + protected String testActorPath(String id) { return factory.createTestActorPath(id); } @@ -355,4 +377,27 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex, actor.getCurrentBehavior().getReplicatedToAllIndex()); } + + @SuppressWarnings("checkstyle:IllegalCatch") + static void verifyRaftState(ActorRef raftActor, Consumer verifier) { + Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS); + AssertionError lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.SECONDS) <= 5) { + try { + OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor, + GetOnDemandRaftState.INSTANCE, timeout), timeout.duration()); + verifier.accept(raftState); + return; + } catch (AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } catch (Exception e) { + lastError = new AssertionError("OnDemandRaftState failed", e); + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } }