X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=3dc6ae469a932474effca186c956a7a6f4ec8631;hb=cd6e12ee194c64a7948424db958e053bf2438f13;hp=66a46ef3bde0ca8a8d1fa8fe1056ccb463a594d6;hpb=73e969cf365dd78772596c71e940ae44fe2f22d3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 66a46ef3bd..b74259d485 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -10,36 +10,47 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; -import akka.persistence.UntypedPersistentActor; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; -import com.google.protobuf.ByteString; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; -import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; -import java.io.Serializable; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -81,9 +92,19 @@ import java.util.Map; *
@@ -485,7 +605,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
context.setPeerAddress(peerId, peerAddress);
}
-
+ protected void commitSnapshot(long sequenceNumber) {
+ context.getSnapshotManager().commit(persistence(), sequenceNumber);
+ }
/**
* The applyState method will be called by the RaftActor when some data
@@ -515,7 +637,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
/**
* This method is called during recovery to append state data to the current batch. This method
- * is called 1 or more times after {@link #startRecoveryStateBatch}.
+ * is called 1 or more times after {@link #startLogRecoveryBatch}.
*
* @param data the state data
*/
@@ -524,13 +646,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
/**
* This method is called during recovery to reconstruct the state of the actor.
*
- * @param snapshot A snapshot of the state of the actor
+ * @param snapshotBytes A snapshot of the state of the actor
*/
- protected abstract void applyRecoverySnapshot(ByteString snapshot);
+ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
/**
* This method is called during recovery at the end of a batch to apply the current batched
- * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+ * log entries. This method is called after {@link #appendRecoveredLogEntry}.
*/
protected abstract void applyCurrentLogRecoveryBatch();
@@ -555,9 +677,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
* operations when the derived actor is out of sync with it's peers
* and the only way to bring it in sync is by applying a snapshot
*
- * @param snapshot A snapshot of the state of the actor
+ * @param snapshotBytes A snapshot of the state of the actor
*/
- protected abstract void applySnapshot(ByteString snapshot);
+ protected abstract void applySnapshot(byte[] snapshotBytes);
/**
* This method will be called by the RaftActor when the state of the
@@ -566,18 +688,15 @@ public abstract class RaftActor extends UntypedPersistentActor {
*/
protected abstract void onStateChanged();
- protected void onLeaderChanged(String oldLeader, String newLeader){};
+ protected abstract DataPersistenceProvider persistence();
- private void trimPersistentData(long sequenceNumber) {
- // Trim akka snapshots
- // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
- // For now guessing that it is ANDed.
- deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+ /**
+ * Notifier Actor for this RaftActor to notify when a role change happens
+ * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+ */
+ protected abstract Optional
+ *
+ * @param o
+ */
+ @Override
+ public void saveSnapshot(Object o) {
+ // Make saving Snapshot successful
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(COMMIT_SNAPSHOT, self());
+ }
+ }
+
+
+ private class CreateSnapshotProcedure implements Procedure