X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FMockRaftActorContext.java;h=2e05a7e5708b49381082353518b81cd299521f4f;hb=ed0c0135e2563fbbfcec41975338cece15c62cc2;hp=0bb2e8a8fc0af5d643a2fa335ee7bc98acfece78;hpb=79e6240ad565717e2fba62a339f11fcbd239f440;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 0bb2e8a8fc..2e05a7e570 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -13,13 +13,20 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.Procedure; -import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; @@ -30,6 +37,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { private ActorSystem system; private RaftPolicy raftPolicy; + private Consumer> createSnapshotProcedure = out -> { }; private static ElectionTerm newElectionTerm() { return new ElectionTerm() { @@ -47,28 +55,32 @@ public class MockRaftActorContext extends RaftActorContextImpl { } @Override - public void update(long newTerm, String newVotedFor) { + public void update(final long newTerm, final String newVotedFor) { this.currentTerm = newTerm; this.votedFor = newVotedFor; // TODO : Write to some persistent state } - @Override public void updateAndPersist(long newTerm, String newVotedFor) { + @Override public void updateAndPersist(final long newTerm, final String newVotedFor) { update(newTerm, newVotedFor); } }; } + private static DataPersistenceProvider createProvider() { + return new NonPersistentDataProvider(Runnable::run); + } + public MockRaftActorContext() { super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(), - new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG); + new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG); setReplicatedLog(new MockReplicatedLogBuilder().build()); } - public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) { + public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) { super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(), - new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG); + new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG); this.system = system; @@ -79,18 +91,18 @@ public class MockRaftActorContext extends RaftActorContextImpl { public void initReplicatedLog() { SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog(); long term = getTermInformation().getCurrentTerm(); - replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1"))); - replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2"))); + replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1"))); + replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2"))); setReplicatedLog(replicatedLog); setCommitIndex(replicatedLog.lastIndex()); setLastApplied(replicatedLog.lastIndex()); } - @Override public ActorRef actorOf(Props props) { + @Override public ActorRef actorOf(final Props props) { return system.actorOf(props); } - @Override public ActorSelection actorSelection(String path) { + @Override public ActorSelection actorSelection(final String path) { return system.actorSelection(path); } @@ -98,7 +110,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { return this.system; } - @Override public ActorSelection getPeerActorSelection(String peerId) { + @Override public ActorSelection getPeerActorSelection(final String peerId) { String peerAddress = getPeerAddress(peerId); if (peerAddress != null) { return actorSelection(peerAddress); @@ -106,7 +118,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { return null; } - public void setPeerAddresses(Map peerAddresses) { + public void setPeerAddresses(final Map peerAddresses) { for (String id: getPeerIds()) { removePeer(id); } @@ -119,16 +131,36 @@ public class MockRaftActorContext extends RaftActorContextImpl { @Override public SnapshotManager getSnapshotManager() { SnapshotManager snapshotManager = super.getSnapshotManager(); - snapshotManager.setCreateSnapshotRunnable(() -> { }); + snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure); + + snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() { + @Override + public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException { + return ByteState.of(snapshotBytes.read()); + } + + @Override + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { + } + + @Override + public void applySnapshot(final State snapshotState) { + } + }); + return snapshotManager; } + public void setCreateSnapshotProcedure(final Consumer> createSnapshotProcedure) { + this.createSnapshotProcedure = createSnapshotProcedure; + } + @Override public RaftPolicy getRaftPolicy() { return raftPolicy != null ? raftPolicy : super.getRaftPolicy(); } - public void setRaftPolicy(RaftPolicy raftPolicy) { + public void setRaftPolicy(final RaftPolicy raftPolicy) { this.raftPolicy = raftPolicy; } @@ -139,25 +171,32 @@ public class MockRaftActorContext extends RaftActorContextImpl { } @Override - public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { + public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { + } + + @Override + public boolean shouldCaptureSnapshot(final long logIndex) { + return false; } @Override - public boolean removeFromAndPersist(long index) { + public boolean removeFromAndPersist(final long index) { return removeFrom(index) >= 0; } @Override @SuppressWarnings("checkstyle:IllegalCatch") - public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback, - boolean doAsync) { + public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, + final Procedure callback, final boolean doAsync) { append(replicatedLogEntry); if (callback != null) { try { callback.apply(replicatedLogEntry); + } catch (RuntimeException e) { + throw e; } catch (Exception e) { - Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -173,12 +212,12 @@ public class MockRaftActorContext extends RaftActorContextImpl { public MockPayload() { } - public MockPayload(String data) { + public MockPayload(final String data) { this.value = data; size = value.length(); } - public MockPayload(String data, int size) { + public MockPayload(final String data, final int size) { this(data); this.size = size; } @@ -202,7 +241,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) { return true; } @@ -224,19 +263,10 @@ public class MockRaftActorContext extends RaftActorContextImpl { } } - // TODO - this class can be removed and use ReplicatedLogImplEntry directly. - public static class MockReplicatedLogEntry extends SimpleReplicatedLogEntry { - private static final long serialVersionUID = 1L; - - public MockReplicatedLogEntry(long term, long index, Payload data) { - super(index, term, data); - } - } - public static class MockReplicatedLogBuilder { private final ReplicatedLog mockLog = new SimpleReplicatedLog(); - public MockReplicatedLogBuilder createEntries(int start, int end, int term) { + public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) { for (int i = start; i < end; i++) { this.mockLog.append(new SimpleReplicatedLogEntry(i, term, new MockRaftActorContext.MockPayload(Integer.toString(i)))); @@ -244,7 +274,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { return this; } - public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) { + public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) { this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload)); return this; }