package org.opendaylight.controller.cluster.raft;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.japi.Procedure;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private ActorSystem system;
private RaftPolicy raftPolicy;
+ private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
private static ElectionTerm newElectionTerm() {
return new ElectionTerm() {
}
@Override
- public void update(long newTerm, String newVotedFor) {
- this.currentTerm = newTerm;
- this.votedFor = newVotedFor;
+ public void update(final long newTerm, final String newVotedFor) {
+ currentTerm = newTerm;
+ votedFor = newVotedFor;
// TODO : Write to some persistent state
}
- @Override public void updateAndPersist(long newTerm, String newVotedFor) {
+ @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
update(newTerm, newVotedFor);
}
};
}
+ private static DataPersistenceProvider createProvider() {
+ return new NonPersistentDataProvider(Runnable::run);
+ }
+
public MockRaftActorContext() {
super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
+ MoreExecutors.directExecutor());
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
- public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
+ public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
- applyState -> actor.tell(applyState, actor), LOG);
+ new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
+ MoreExecutors.directExecutor());
this.system = system;
setLastApplied(replicatedLog.lastIndex());
}
- @Override public ActorRef actorOf(Props props) {
+ @Override public ActorRef actorOf(final Props props) {
return system.actorOf(props);
}
- @Override public ActorSelection actorSelection(String path) {
+ @Override public ActorSelection actorSelection(final String path) {
return system.actorSelection(path);
}
@Override public ActorSystem getActorSystem() {
- return this.system;
+ return system;
}
- @Override public ActorSelection getPeerActorSelection(String peerId) {
+ @Override public ActorSelection getPeerActorSelection(final String peerId) {
String peerAddress = getPeerAddress(peerId);
if (peerAddress != null) {
return actorSelection(peerAddress);
return null;
}
- public void setPeerAddresses(Map<String, String> peerAddresses) {
+ public void setPeerAddresses(final Map<String, String> peerAddresses) {
for (String id: getPeerIds()) {
removePeer(id);
}
@Override
public SnapshotManager getSnapshotManager() {
SnapshotManager snapshotManager = super.getSnapshotManager();
- snapshotManager.setCreateSnapshotConsumer(out -> { });
+ snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
@Override
- public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
return ByteState.of(snapshotBytes.read());
}
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
}
@Override
- public void applySnapshot(State snapshotState) {
+ public void applySnapshot(final State snapshotState) {
}
});
return snapshotManager;
}
+ public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+ this.createSnapshotProcedure = createSnapshotProcedure;
+ }
+
@Override
public RaftPolicy getRaftPolicy() {
return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
}
- public void setRaftPolicy(RaftPolicy raftPolicy) {
+ public void setRaftPolicy(final RaftPolicy raftPolicy) {
this.raftPolicy = raftPolicy;
}
}
@Override
- public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+ public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
}
@Override
- public boolean shouldCaptureSnapshot(long logIndex) {
+ public boolean shouldCaptureSnapshot(final long logIndex) {
return false;
}
@Override
- public boolean removeFromAndPersist(long index) {
+ public boolean removeFromAndPersist(final long index) {
return removeFrom(index) >= 0;
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
- boolean doAsync) {
+ public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+ final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
append(replicatedLogEntry);
if (callback != null) {
- try {
- callback.apply(replicatedLogEntry);
- } catch (Exception e) {
- Throwables.propagate(e);
- }
+ callback.accept(replicatedLogEntry);
}
return true;
}
}
- public static class MockPayload extends Payload implements Serializable {
+ public static final class MockPayload extends Payload {
private static final long serialVersionUID = 3121380393130864247L;
- private String value = "";
- private int size;
+
+ private final String data;
+ private final int size;
public MockPayload() {
+ this("");
}
- public MockPayload(String data) {
- this.value = data;
- size = value.length();
+ public MockPayload(final String data) {
+ this(data, data.length());
}
- public MockPayload(String data, int size) {
- this(data);
+ public MockPayload(final String data, final int size) {
+ this.data = requireNonNull(data);
this.size = size;
}
return size;
}
+ @Override
+ public int serializedSize() {
+ return size;
+ }
+
@Override
public String toString() {
- return value;
+ return data;
}
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (value == null ? 0 : value.hashCode());
- return result;
+ return data.hashCode();
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- MockPayload other = (MockPayload) obj;
- if (value == null) {
- if (other.value != null) {
- return false;
- }
- } else if (!value.equals(other.value)) {
- return false;
- }
- return true;
+ public boolean equals(final Object obj) {
+ return this == obj || obj instanceof MockPayload other && Objects.equals(data, other.data)
+ && size == other.size;
+ }
+
+ @Override
+ protected Object writeReplace() {
+ return new MockPayloadProxy(data, size);
+ }
+ }
+
+ private static final class MockPayloadProxy implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String value;
+ private final int size;
+
+ MockPayloadProxy(String value, int size) {
+ this.value = value;
+ this.size = size;
+ }
+
+ Object readResolve() {
+ return new MockPayload(value, size);
}
}
public static class MockReplicatedLogBuilder {
private final ReplicatedLog mockLog = new SimpleReplicatedLog();
- public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+ public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
for (int i = start; i < end; i++) {
- this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
+ mockLog.append(new SimpleReplicatedLogEntry(i, term,
new MockRaftActorContext.MockPayload(Integer.toString(i))));
}
return this;
}
- public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
- this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
+ public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
+ mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
return this;
}
public ReplicatedLog build() {
- return this.mockLog;
+ return mockLog;
}
}