Use java.util.function.Supplier instead of Guava
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / AbstractRaftActorIntegrationTest.java
index 3c6c8281fb734b0d378963c831c2f462fbcc3182..b81dcc92acfcc1d781e68c03ad9b8c3a334cee5e 100644 (file)
@@ -7,27 +7,27 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
+import akka.actor.InvalidActorNameException;
 import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.junit.After;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActorTest.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -48,22 +48,32 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest {
 
+    public static class SetPeerAddress {
+        private final String peerId;
+        private final String peerAddress;
+
+        public SetPeerAddress(String peerId, String peerAddress) {
+            this.peerId = peerId;
+            this.peerAddress = peerAddress;
+        }
+
+        public String getPeerId() {
+            return peerId;
+        }
+
+        public String getPeerAddress() {
+            return peerAddress;
+        }
+    }
+
     public static class TestRaftActor extends MockRaftActor {
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
-        private volatile byte[] snapshot;
 
-        private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
-                TestActorRef<MessageCollectorActor> collectorActor) {
-            super(id, peerAddresses, Optional.of(config), null);
-            this.collectorActor = collectorActor;
-        }
-
-        public static Props props(String id, Map<String, String> peerAddresses, ConfigParams config,
-                TestActorRef<MessageCollectorActor> collectorActor) {
-            return Props.create(TestRaftActor.class, id, peerAddresses, config, collectorActor).
-                    withDispatcher(Dispatchers.DefaultDispatcherId());
+        private TestRaftActor(Builder builder) {
+            super(builder);
+            this.collectorActor = builder.collectorActor;
         }
 
         void startDropMessages(Class<?> msgClass) {
@@ -96,6 +106,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
                 return;
             }
 
+            if(message instanceof SetPeerAddress) {
+                setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
+                        ((SetPeerAddress) message).getPeerAddress());
+                return;
+            }
+
             try {
                 if(!dropMessages.containsKey(message.getClass())) {
                     super.handleCommand(message);
@@ -113,24 +129,37 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         @Override
         public void createSnapshot(ActorRef actorRef) {
-            if(snapshot != null) {
-                getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+            try {
+                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
 
-        @Override
-        public void applyRecoverySnapshot(byte[] bytes) {
+        public ActorRef collectorActor() {
+            return collectorActor;
         }
 
-        void setSnapshot(byte[] snapshot) {
-            this.snapshot = snapshot;
+        public static Builder newBuilder() {
+            return new Builder();
         }
 
-        public ActorRef collectorActor() {
-            return collectorActor;
+        public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
+            private TestActorRef<MessageCollectorActor> collectorActor;
+
+            public Builder collectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+                this.collectorActor = collectorActor;
+                return this;
+            }
+
+            private Builder() {
+                super(TestRaftActor.class);
+            }
         }
     }
 
+    protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+
     protected final Logger testLog = LoggerFactory.getLogger(getClass());
 
     protected final TestActorFactory factory = new TestActorFactory(getSystem());
@@ -159,6 +188,10 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected long initialTerm = 5;
     protected long currentTerm;
 
+    protected int snapshotBatchCount = 4;
+
+    protected List<MockPayload> expSnapshotState = new ArrayList<>();
+
     @After
     public void tearDown() {
         InMemoryJournal.clear();
@@ -170,9 +203,10 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(1);
-        configParams.setSnapshotBatchCount(4);
+        configParams.setSnapshotBatchCount(snapshotBatchCount);
         configParams.setSnapshotDataThresholdPercentage(70);
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
         return configParams;
     }
 
@@ -184,17 +218,32 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected void waitUntilLeader(ActorRef actorRef) {
-        RaftActorTest.RaftActorTestKit.waitUntilLeader(actorRef);
+        RaftActorTestKit.waitUntilLeader(actorRef);
     }
 
     protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
             ConfigParams configParams) {
-        TestActorRef<MessageCollectorActor> collectorActor = factory.createTestActor(
+        return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses :
+            Collections.<String, String>emptyMap()).config(configParams));
+    }
+
+    protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) {
+        builder.collectorActor(factory.<MessageCollectorActor>createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        factory.generateActorId(id + "-collector"));
-        return factory.createTestActor(TestRaftActor.props(id,
-                peerAddresses != null ? peerAddresses : Collections.<String, String>emptyMap(),
-                        configParams, collectorActor), id);
+                        factory.generateActorId(id + "-collector"))).id(id);
+
+        InvalidActorNameException lastEx = null;
+        for(int i = 0; i < 10; i++) {
+            try {
+                return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+            } catch (InvalidActorNameException e) {
+                lastEx = e;
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        assertNotNull(lastEx);
+        throw lastEx;
     }
 
     protected void killActor(TestActorRef<TestRaftActor> leaderActor) {
@@ -216,13 +265,21 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         });
     }
 
+    @SuppressWarnings("unchecked")
     protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
-            int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+            long lastAppliedIndex, long lastTerm, long lastIndex)
+                    throws Exception {
         assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
         assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
         assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
         assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
-        assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+        List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+        assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState,
+                actualState), expSnapshotState.size(), actualState.size());
+        for(int i = 0; i < expSnapshotState.size(); i++) {
+            assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+        }
     }
 
     protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
@@ -267,6 +324,34 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected String testActorPath(String id){
-        return "akka://test/user" + id;
+        return factory.createTestActorPath(id);
+    }
+
+    protected void verifyLeadersTrimmedLog(long lastIndex) {
+        verifyTrimmedLog("Leader", leaderActor, lastIndex, lastIndex - 1);
+    }
+
+    protected void verifyLeadersTrimmedLog(long lastIndex, long replicatedToAllIndex) {
+        verifyTrimmedLog("Leader", leaderActor, lastIndex, replicatedToAllIndex);
+    }
+
+    protected void verifyFollowersTrimmedLog(int num, TestActorRef<TestRaftActor> actorRef, long lastIndex) {
+        verifyTrimmedLog("Follower " + num, actorRef, lastIndex, lastIndex - 1);
+    }
+
+    protected void verifyTrimmedLog(String name, TestActorRef<TestRaftActor> actorRef, long lastIndex,
+            long replicatedToAllIndex) {
+        TestRaftActor actor = actorRef.underlyingActor();
+        RaftActorContext context = actor.getRaftActorContext();
+        long snapshotIndex = lastIndex - 1;
+        assertEquals(name + " snapshot term", snapshotIndex < 0 ? -1 : currentTerm,
+                context.getReplicatedLog().getSnapshotTerm());
+        assertEquals(name + " snapshot index", snapshotIndex, context.getReplicatedLog().getSnapshotIndex());
+        assertEquals(name + " journal log size", 1, context.getReplicatedLog().size());
+        assertEquals(name + " journal last index", lastIndex, context.getReplicatedLog().lastIndex());
+        assertEquals(name + " commit index", lastIndex, context.getCommitIndex());
+        assertEquals(name + " last applied", lastIndex, context.getLastApplied());
+        assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
+                actor.getCurrentBehavior().getReplicatedToAllIndex());
     }
 }