import akka.actor.Props;
import akka.japi.Procedure;
import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
private ActorSystem system;
private RaftPolicy raftPolicy;
+ private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
private static ElectionTerm newElectionTerm() {
return new ElectionTerm() {
public MockRaftActorContext() {
super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+ new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+ new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
+ applyState -> actor.tell(applyState, actor), LOG);
this.system = system;
public void initReplicatedLog() {
SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
long term = getTermInformation().getCurrentTerm();
- replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
- replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
+ replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
setReplicatedLog(replicatedLog);
setCommitIndex(replicatedLog.lastIndex());
setLastApplied(replicatedLog.lastIndex());
@Override
public SnapshotManager getSnapshotManager() {
SnapshotManager snapshotManager = super.getSnapshotManager();
- snapshotManager.setCreateSnapshotRunnable(() -> { });
+ snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
+
+ snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
+ @Override
+ public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+ return ByteState.of(snapshotBytes.read());
+ }
+
+ @Override
+ public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ }
+
+ @Override
+ public void applySnapshot(State snapshotState) {
+ }
+ });
+
return snapshotManager;
}
+ public void setCreateSnapshotProcedure(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+ this.createSnapshotProcedure = createSnapshotProcedure;
+ }
+
@Override
public RaftPolicy getRaftPolicy() {
return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
}
+ @Override
+ public boolean shouldCaptureSnapshot(long logIndex) {
+ return false;
+ }
+
@Override
public boolean removeFromAndPersist(long index) {
return removeFrom(index) >= 0;
}
}
- // TODO - this class can be removed and use ReplicatedLogImplEntry directly.
- public static class MockReplicatedLogEntry extends SimpleReplicatedLogEntry {
- private static final long serialVersionUID = 1L;
-
- public MockReplicatedLogEntry(long term, long index, Payload data) {
- super(index, term, data);
- }
- }
-
public static class MockReplicatedLogBuilder {
private final ReplicatedLog mockLog = new SimpleReplicatedLog();