X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=8135d837d3ad86563ad0246022941bfc6134b59d;hp=27e8f34d327dd5ebe9e1b1ca8a9d8e6ea1b6ceba;hb=b10d77375b5a290143106180f1583ea4e18f8478;hpb=c675288c3ebd727fc72013e899183995c3fa9be7 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 27e8f34d32..8135d837d3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import com.google.common.base.Optional; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import java.io.Serializable; -import java.util.List; import java.util.Map; /** @@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor { */ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); + private CaptureSnapshot captureSnapshot = null; + + private volatile boolean hasSnapshotCaptureInitiated = false; public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); @@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog = new ReplicatedLogImpl(snapshot); context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); LOG.debug("Applied snapshot to replicatedLog. " + "snapshotIndex={}, snapshotTerm={}, journal-size={}", @@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog.size()); // Apply the snapshot to the actors state - applySnapshot(snapshot.getState()); + applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { replicatedLog.append((ReplicatedLogEntry) message); @@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor { applyState.getReplicatedLogEntry().getData()); } else if(message instanceof ApplySnapshot ) { - applySnapshot(((ApplySnapshot) message).getSnapshot()); + Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); + + LOG.debug("ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm()); + applySnapshot(ByteString.copyFrom(snapshot.getState())); + + //clears the followers log, sets the snapshot index to ensure adjusted-index works + replicatedLog = new ReplicatedLogImpl(snapshot); + context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); } else if (message instanceof FindLeader) { getSender().tell( @@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof SaveSnapshotSuccess) { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; + LOG.info("SaveSnapshotSuccess received for snapshot"); + + context.getReplicatedLog().snapshotCommit(); // TODO: Not sure if we want to be this aggressive with trimming stuff trimPersistentData(success.metadata().sequenceNr()); } else if (message instanceof SaveSnapshotFailure) { + SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; + + LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString()); + LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:"); - // TODO: Handle failure in saving the snapshot + context.getReplicatedLog().snapshotRollback(); + + LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); } else if (message instanceof AddRaftPeer){ @@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor { RemoveRaftPeer rrp = (RemoveRaftPeer)message; context.removePeer(rrp.getName()); + } else if (message instanceof CaptureSnapshot) { + LOG.debug("CaptureSnapshot received by actor"); + CaptureSnapshot cs = (CaptureSnapshot)message; + captureSnapshot = cs; + createSnapshot(); + + } else if (message instanceof CaptureSnapshotReply){ + LOG.debug("CaptureSnapshotReply received by actor"); + CaptureSnapshotReply csr = (CaptureSnapshotReply) message; + + ByteString stateInBytes = csr.getSnapshot(); + LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + handleCaptureSnapshotReply(stateInBytes); + } else { + if (!(message instanceof AppendEntriesMessages.AppendEntries) + && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { + LOG.debug("onReceiveCommand: message:" + message.getClass()); + } RaftState state = currentBehavior.handleMessage(getSender(), message); @@ -348,7 +398,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @return The current state of the actor */ - protected abstract Object createSnapshot(); + protected abstract void createSnapshot(); /** * This method will be called by the RaftActor during recovery to @@ -360,7 +410,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @param snapshot A snapshot of the state of the actor */ - protected abstract void applySnapshot(Object snapshot); + protected abstract void applySnapshot(ByteString snapshot); /** * This method will be called by the RaftActor when the state of the @@ -427,11 +477,39 @@ public abstract class RaftActor extends UntypedPersistentActor { return peerAddress; } + private void handleCaptureSnapshotReply(ByteString stateInBytes) { + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + + Snapshot sn = Snapshot.create(stateInBytes.toByteArray(), + context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), + captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + + saveSnapshot(sn); + + LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + + //be greedy and remove entries from in-mem journal which are in the snapshot + // and update snapshotIndex and snapshotTerm without waiting for the success, + + context.getReplicatedLog().snapshotPreCommit(stateInBytes, + captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + captureSnapshot = null; + hasSnapshotCaptureInitiated = false; + } + private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - super(snapshot.getState(), + super(ByteString.copyFrom(snapshot.getState()), snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), snapshot.getUnAppliedEntries()); } @@ -480,8 +558,10 @@ public abstract class RaftActor extends UntypedPersistentActor { persist(replicatedLogEntry, new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { - // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) { + // when a snaphsot is being taken, captureSnapshot != null + if (hasSnapshotCaptureInitiated == false && + journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) { + LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -497,26 +577,11 @@ public abstract class RaftActor extends UntypedPersistentActor { LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); - // create a snapshot object from the state provided and save it - // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(createSnapshot(), - getFrom(context.getLastApplied() + 1), - lastIndex(), lastTerm(), lastAppliedIndex, - lastAppliedTerm); - saveSnapshot(sn); - - LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); - - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, - // TODO: damage-recovery to be done on failure - journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear(); - snapshotIndex = lastAppliedIndex; - snapshotTerm = lastAppliedTerm; - - LOG.info("Removed in-memory snapshotted entries, " + - "adjusted snaphsotIndex:{}" + - "and term:{}", snapshotIndex, lastAppliedTerm); + // send a CaptureSnapshot to self to make the expensive operation async. + getSelf().tell(new CaptureSnapshot( + lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + null); + hasSnapshotCaptureInitiated = true; } // Send message for replication if (clientActor != null) { @@ -546,65 +611,6 @@ public abstract class RaftActor extends UntypedPersistentActor { } - private static class Snapshot implements Serializable { - private final Object state; - private final List unAppliedEntries; - private final long lastIndex; - private final long lastTerm; - private final long lastAppliedIndex; - private final long lastAppliedTerm; - - private Snapshot(Object state, - List unAppliedEntries, long lastIndex, - long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { - this.state = state; - this.unAppliedEntries = unAppliedEntries; - this.lastIndex = lastIndex; - this.lastTerm = lastTerm; - this.lastAppliedIndex = lastAppliedIndex; - this.lastAppliedTerm = lastAppliedTerm; - } - - - public static Snapshot create(Object state, - List entries, long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm) { - return new Snapshot(state, entries, lastIndex, lastTerm, - lastAppliedIndex, lastAppliedTerm); - } - - public Object getState() { - return state; - } - - public List getUnAppliedEntries() { - return unAppliedEntries; - } - - public long getLastTerm() { - return lastTerm; - } - - public long getLastAppliedIndex() { - return lastAppliedIndex; - } - - public long getLastAppliedTerm() { - return lastAppliedTerm; - } - - public String getLogMessage() { - StringBuilder sb = new StringBuilder(); - return sb.append("Snapshot={") - .append("lastTerm:" + this.getLastTerm() + ", ") - .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") - .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") - .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") - .toString(); - - } - } - private class ElectionTermImpl implements ElectionTerm { /** * Identifier of the actor whose election term information this is