X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=c15c9198bd17c2a3da0fa54b017379a5e8971b13;hp=12123db12995061901a39a264c79f0237d78d00a;hb=b5167b9bc04f2792b275cfe0eac78c0f5eb9442d;hpb=4ef563c481b83e360e688a59ac346b8328870d58 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 12123db129..c15c9198bd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -2,42 +2,124 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Terminated; import akka.event.Logging; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.protobuf.ByteString; +import org.junit.After; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; - +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; +import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; +import scala.concurrent.duration.FiniteDuration; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; - -import static junit.framework.TestCase.assertEquals; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; public class RaftActorTest extends AbstractActorTest { + @After + public void tearDown() { + MockAkkaJournal.clearJournal(); + MockSnapshotStore.setMockSnapshot(null); + } + public static class MockRaftActor extends RaftActor { - public MockRaftActor(String id, - Map peerAddresses) { - super(id, peerAddresses); + public static final class MockRaftActorCreator implements Creator { + private final Map peerAddresses; + private final String id; + private final Optional config; + + private MockRaftActorCreator(Map peerAddresses, String id, + Optional config) { + this.peerAddresses = peerAddresses; + this.id = id; + this.config = config; + } + + @Override + public MockRaftActor create() throws Exception { + return new MockRaftActor(id, peerAddresses, config); + } } - public static Props props(final String id, final Map peerAddresses){ - return Props.create(new Creator(){ + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final List state; - @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses); - } - }); + public MockRaftActor(String id, Map peerAddresses, Optional config) { + super(id, peerAddresses, config); + state = new ArrayList<>(); + } + + public void waitForRecoveryComplete() { + try { + assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public List getState() { + return state; + } + + public static Props props(final String id, final Map peerAddresses, + Optional config){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config)); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { + } + + @Override + protected void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + protected void appendRecoveredLogEntry(Payload data) { + state.add(data); + } + + @Override + protected void applyCurrentLogRecoveryBatch() { + } + + @Override + protected void onRecoveryComplete() { + recoveryComplete.countDown(); } - @Override protected void applyState(ActorRef clientActor, - String identifier, - Object data) { + @Override + protected void applyRecoverySnapshot(ByteString snapshot) { + try { + Object data = toObject(snapshot); + System.out.println("!!!!!applyRecoverySnapshot: "+data); + if (data instanceof List) { + state.addAll((List) data); + } + } catch (Exception e) { + e.printStackTrace(); + } } @Override protected void createSnapshot() { @@ -45,7 +127,6 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void applySnapshot(ByteString snapshot) { - throw new UnsupportedOperationException("applySnapshot"); } @Override protected void onStateChanged() { @@ -55,6 +136,26 @@ public class RaftActorTest extends AbstractActorTest { return this.getId(); } + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + Object obj = null; + ByteArrayInputStream bis = null; + ObjectInputStream ois = null; + try { + bis = new ByteArrayInputStream(bs.toByteArray()); + ois = new ObjectInputStream(bis); + obj = ois.readObject(); + } finally { + if (bis != null) { + bis.close(); + } + if (ois != null) { + ois.close(); + } + } + return obj; + } + + } @@ -64,9 +165,8 @@ public class RaftActorTest extends AbstractActorTest { public RaftActorTestKit(ActorSystem actorSystem, String actorName) { super(actorSystem); - raftActor = this.getSystem() - .actorOf(MockRaftActor.props(actorName, - Collections.EMPTY_MAP), actorName); + raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName, + Collections.EMPTY_MAP, Optional.absent()), actorName); } @@ -76,48 +176,27 @@ public class RaftActorTest extends AbstractActorTest { return new JavaTestKit.EventFilter(Logging.Info.class ) { + @Override protected Boolean run() { return true; } }.from(raftActor.path().toString()) - .message("Switching from state Candidate to Leader") + .message("Switching from behavior Candidate to Leader") .occurrences(1).exec(); } public void findLeader(final String expectedLeader){ + raftActor.tell(new FindLeader(), getRef()); - - new Within(duration("1 seconds")) { - protected void run() { - - raftActor.tell(new FindLeader(), getRef()); - - String s = new ExpectMsg(duration("1 seconds"), - "findLeader") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof FindLeaderReply) { - return ((FindLeaderReply) in).getLeaderActor(); - } else { - throw noMatch(); - } - } - }.get();// this extracts the received message - - assertEquals(expectedLeader, s); - - } - - - }; + FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class); + assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor()); } public ActorRef getRaftActor() { return raftActor; } - } @@ -134,5 +213,104 @@ public class RaftActorTest extends AbstractActorTest { kit.findLeader(kit.getRaftActor().path().toString()); } + @Test + public void testRaftActorRecovery() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = "follower10"; + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + // Set the heartbeat interval high to essentially disable election otherwise the test + // may fail if the actor is switched to Leader and the commitIndex is set to the last + // log entry. + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId, + Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + + watch(followerActor); + + List snapshotUnappliedEntries = new ArrayList<>(); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, + new MockRaftActorContext.MockPayload("E")); + snapshotUnappliedEntries.add(entry1); + + int lastAppliedDuringSnapshotCapture = 3; + int lastIndexDuringSnapshotCapture = 4; + + // 4 messages as part of snapshot, which are applied to state + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + + Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), + snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , + lastAppliedDuringSnapshotCapture, 1); + MockSnapshotStore.setMockSnapshot(snapshot); + MockSnapshotStore.setPersistenceId(persistenceId); + + // add more entries after snapshot is taken + List entries = new ArrayList<>(); + ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, + new MockRaftActorContext.MockPayload("F")); + ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("G")); + ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, + new MockRaftActorContext.MockPayload("H")); + entries.add(entry2); + entries.add(entry3); + entries.add(entry4); + + int lastAppliedToState = 5; + int lastIndex = 7; + + MockAkkaJournal.addToJournal(5, entry2); + // 2 entries are applied to state besides the 4 entries in snapshot + MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); + MockAkkaJournal.addToJournal(7, entry3); + MockAkkaJournal.addToJournal(8, entry4); + + // kill the actor + followerActor.tell(PoisonPill.getInstance(), null); + expectMsgClass(duration("5 seconds"), Terminated.class); + + unwatch(followerActor); + + //reinstate the actor + TestActorRef ref = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, Collections.EMPTY_MAP, + Optional.of(config))); + + ref.underlyingActor().waitForRecoveryComplete(); + + RaftActorContext context = ref.underlyingActor().getRaftActorContext(); + assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(), + context.getReplicatedLog().size()); + assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); + assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); + assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size()); + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(snapshot); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } }