X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FMockRaftActorContext.java;h=6d4ec22e3d6be213f9a5f09d9b62042b9649d527;hb=bdf08208d05dd17fae239a6951a96e076e2155af;hp=5104fde125c518b86e50fe63d68a93cc087051d8;hpb=ac5bcae15c3c9831f2879ae44c66b39130e3f2fe;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 5104fde125..6d4ec22e3d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -8,23 +8,30 @@ package org.opendaylight.controller.cluster.raft; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.japi.Procedure; - -import com.google.protobuf.GeneratedMessage; - +import com.google.common.io.ByteSource; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; - +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.messages.Payload; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; -import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +40,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { private ActorSystem system; private RaftPolicy raftPolicy; + private Consumer> createSnapshotProcedure = out -> { }; private static ElectionTerm newElectionTerm() { return new ElectionTerm() { @@ -50,28 +58,34 @@ public class MockRaftActorContext extends RaftActorContextImpl { } @Override - public void update(long currentTerm, String votedFor){ - this.currentTerm = currentTerm; - this.votedFor = votedFor; + public void update(final long newTerm, final String newVotedFor) { + currentTerm = newTerm; + votedFor = newVotedFor; // TODO : Write to some persistent state } - @Override public void updateAndPersist(long currentTerm, - String votedFor) { - update(currentTerm, votedFor); + @Override public void updateAndPersist(final long newTerm, final String newVotedFor) { + update(newTerm, newVotedFor); } }; } - public MockRaftActorContext(){ - super(null, null, "test", newElectionTerm(), -1, -1, new HashMap(), - new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG); + private static DataPersistenceProvider createProvider() { + return new NonPersistentDataProvider(Runnable::run); + } + + public MockRaftActorContext() { + super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(), + new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG, + MoreExecutors.directExecutor()); + setReplicatedLog(new MockReplicatedLogBuilder().build()); } - public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ - super(actor, null, id, newElectionTerm(), -1, -1, new HashMap(), - new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG); + public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) { + super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(), + new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG, + MoreExecutors.directExecutor()); this.system = system; @@ -79,40 +93,42 @@ public class MockRaftActorContext extends RaftActorContextImpl { } - public void initReplicatedLog(){ + public void initReplicatedLog() { SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog(); long term = getTermInformation().getCurrentTerm(); - replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1"))); - replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2"))); + replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1"))); + replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2"))); setReplicatedLog(replicatedLog); + setCommitIndex(replicatedLog.lastIndex()); + setLastApplied(replicatedLog.lastIndex()); } - @Override public ActorRef actorOf(Props props) { + @Override public ActorRef actorOf(final Props props) { return system.actorOf(props); } - @Override public ActorSelection actorSelection(String path) { + @Override public ActorSelection actorSelection(final String path) { return system.actorSelection(path); } @Override public ActorSystem getActorSystem() { - return this.system; + return system; } - @Override public ActorSelection getPeerActorSelection(String peerId) { + @Override public ActorSelection getPeerActorSelection(final String peerId) { String peerAddress = getPeerAddress(peerId); - if(peerAddress != null){ + if (peerAddress != null) { return actorSelection(peerAddress); } return null; } - public void setPeerAddresses(Map peerAddresses) { - for(String id: getPeerIds()) { + public void setPeerAddresses(final Map peerAddresses) { + for (String id: getPeerIds()) { removePeer(id); } - for(Map.Entry e: peerAddresses.entrySet()) { + for (Map.Entry e: peerAddresses.entrySet()) { addToPeers(e.getKey(), e.getValue(), VotingState.VOTING); } } @@ -120,224 +136,163 @@ public class MockRaftActorContext extends RaftActorContextImpl { @Override public SnapshotManager getSnapshotManager() { SnapshotManager snapshotManager = super.getSnapshotManager(); - snapshotManager.setCreateSnapshotCallable(NoopProcedure.instance()); + snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure); + + snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() { + @Override + public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException { + return ByteState.of(snapshotBytes.read()); + } + + @Override + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { + } + + @Override + public void applySnapshot(final State snapshotState) { + } + }); + return snapshotManager; } + public void setCreateSnapshotProcedure(final Consumer> createSnapshotProcedure) { + this.createSnapshotProcedure = createSnapshotProcedure; + } + @Override public RaftPolicy getRaftPolicy() { return raftPolicy != null ? raftPolicy : super.getRaftPolicy(); } - public void setRaftPolicy(RaftPolicy raftPolicy) { + public void setRaftPolicy(final RaftPolicy raftPolicy) { this.raftPolicy = raftPolicy; } public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl { @Override - public void appendAndPersist( - ReplicatedLogEntry replicatedLogEntry) { - append(replicatedLogEntry); + public int dataSize() { + return -1; } @Override - public int dataSize() { - return -1; + public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { } @Override - public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { + public boolean shouldCaptureSnapshot(final long logIndex) { + return false; } - @Override public void removeFromAndPersist(long index) { - removeFrom(index); + @Override + public boolean removeFromAndPersist(final long index) { + return removeFrom(index) >= 0; } @Override - public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback) { + @SuppressWarnings("checkstyle:IllegalCatch") + public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, + final Consumer callback, final boolean doAsync) { append(replicatedLogEntry); - if(callback != null) { - try { - callback.apply(replicatedLogEntry); - } catch (Exception e) { - e.printStackTrace(); - } + if (callback != null) { + callback.accept(replicatedLogEntry); } + + return true; } } - public static class MockPayload extends Payload implements Serializable { + public static final class MockPayload extends Payload { private static final long serialVersionUID = 3121380393130864247L; - private String value = ""; - private int size; + + private final String data; + private final int size; public MockPayload() { + this(""); } - public MockPayload(String s) { - this.value = s; - size = value.length(); + public MockPayload(final String data) { + this(data, data.length()); } - public MockPayload(String s, int size) { - this(s); + public MockPayload(final String data, final int size) { + this.data = requireNonNull(data); this.size = size; } - @Override public Map, String> encode() { - Map, String> map = new HashMap<>(); - map.put(MockPayloadMessages.value, value); - return map; - } - - @Override public Payload decode( - AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payloadProtoBuff) { - String value = payloadProtoBuff.getExtension(MockPayloadMessages.value); - this.value = value; - return this; - } - @Override public int size() { return size; } - @Override public String getClientPayloadClassName() { - return MockPayload.class.getName(); + @Override + public int serializedSize() { + return size; } @Override public String toString() { - return value; + return data; } @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((value == null) ? 0 : value.hashCode()); - return result; + return data.hashCode(); } @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - MockPayload other = (MockPayload) obj; - if (value == null) { - if (other.value != null) { - return false; - } - } else if (!value.equals(other.value)) { - return false; - } - return true; - } - } - - public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable { - private static final long serialVersionUID = 1L; - - private final long term; - private final long index; - private final Payload data; - - public MockReplicatedLogEntry(long term, long index, Payload data){ - - this.term = term; - this.index = index; - this.data = data; - } - - @Override public Payload getData() { - return data; - } - - @Override public long getTerm() { - return term; - } - - @Override public long getIndex() { - return index; + public boolean equals(final Object obj) { + return this == obj || obj instanceof MockPayload other && Objects.equals(data, other.data) + && size == other.size; } @Override - public int size() { - return getData().size(); + protected Object writeReplace() { + return new MockPayloadProxy(data, size); } + } - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((data == null) ? 0 : data.hashCode()); - result = prime * result + (int) (index ^ (index >>> 32)); - result = prime * result + (int) (term ^ (term >>> 32)); - return result; - } + private static final class MockPayloadProxy implements Serializable { + private static final long serialVersionUID = 1L; - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj; - if (data == null) { - if (other.data != null) { - return false; - } - } else if (!data.equals(other.data)) { - return false; - } - if (index != other.index) { - return false; - } - if (term != other.term) { - return false; - } - return true; + private final String value; + private final int size; + + MockPayloadProxy(String value, int size) { + this.value = value; + this.size = size; } - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index) - .append(", data=").append(data).append("]"); - return builder.toString(); + Object readResolve() { + return new MockPayload(value, size); } } public static class MockReplicatedLogBuilder { private final ReplicatedLog mockLog = new SimpleReplicatedLog(); - public MockReplicatedLogBuilder createEntries(int start, int end, int term) { - for (int i=start; i