Bug 2933: Implemented DataTreeChangeService
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index aa72485187cc9143fbcf6eac5f1adb0b7815b27e..a13b6ff95ab356550bf45bff63665f61dc1e3aec 100644 (file)
@@ -22,7 +22,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
@@ -30,6 +29,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
@@ -40,7 +42,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -98,12 +99,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final Procedure<ApplyJournalEntries> APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK =
-            new Procedure<ApplyJournalEntries>() {
-                @Override
-                public void apply(ApplyJournalEntries param) throws Exception {
-                }
-            };
+    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -119,13 +115,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     private final RaftActorContextImpl context;
 
+    private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+
+    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
     /**
      * The in-memory journal
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
-    private CaptureSnapshot captureSnapshot = null;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -140,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
          Optional<ConfigParams> configParams) {
 
         context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(), id, new ElectionTermImpl(),
+            this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
             -1, -1, replicatedLog, peerAddresses,
             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
             LOG);
@@ -340,7 +338,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
             }
 
-            persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK);
+            persistence().persist(applyEntries, NoopProcedure.instance());
 
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
@@ -379,26 +377,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
                     persistenceId(), saveSnapshotFailure.cause());
 
-            context.getReplicatedLog().snapshotRollback();
-
-            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
-                context.getReplicatedLog().getSnapshotIndex(),
-                context.getReplicatedLog().getSnapshotTerm(),
-                context.getReplicatedLog().size());
+            context.getSnapshotManager().rollback();
 
         } else if (message instanceof CaptureSnapshot) {
             LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
 
-            if(captureSnapshot == null) {
-                captureSnapshot = (CaptureSnapshot)message;
-                createSnapshot();
-            }
+            context.getSnapshotManager().create(createSnapshotProcedure);
 
-        } else if (message instanceof CaptureSnapshotReply){
+        } else if (message instanceof CaptureSnapshotReply) {
             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(currentBehavior);
 
@@ -416,7 +407,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog.dataSize())
                 .inMemoryJournalLogSize(replicatedLog.size())
-                .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+                .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
                 .lastApplied(context.getLastApplied())
                 .lastIndex(replicatedLog.lastIndex())
                 .lastTerm(replicatedLog.lastTerm())
@@ -515,15 +506,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // the state to durable storage
                             self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                            // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!context.isSnapshotCaptureInitiated()){
-                                raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
-                                        raftContext.getTermInformation().getCurrentTerm());
-                                raftContext.getReplicatedLog().snapshotCommit();
-                            } else {
-                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
-                                        persistenceId(), getId());
-                            }
+                            context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
                         } else if (clientActor != null) {
                             // Send message for replication
                             currentBehavior.handleMessage(getSelf(),
@@ -602,6 +586,41 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setConfigParams(configParams);
     }
 
+    public final DataPersistenceProvider persistence() {
+        return delegatingPersistenceProvider.getDelegate();
+    }
+
+    public void setPersistence(DataPersistenceProvider provider) {
+        delegatingPersistenceProvider.setDelegate(provider);
+    }
+
+    protected void setPersistence(boolean persistent) {
+        if(persistent) {
+            setPersistence(new PersistentDataProvider(this));
+        } else {
+            setPersistence(new NonPersistentDataProvider() {
+                /**
+                 * The way snapshotting works is,
+                 * <ol>
+                 * <li> RaftActor calls createSnapshot on the Shard
+                 * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+                 * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
+                 * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
+                 * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
+                 * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
+                 * </ol>
+                 */
+                @Override
+                public void saveSnapshot(Object o) {
+                    // Make saving Snapshot successful
+                    // Committing the snapshot here would end up calling commit in the creating state which would
+                    // be a state violation. That's why now we send a message to commit the snapshot.
+                    self().tell(COMMIT_SNAPSHOT, self());
+                }
+            });
+        }
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -621,10 +640,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void commitSnapshot(long sequenceNumber) {
-        context.getReplicatedLog().snapshotCommit();
-
-        // TODO: Not sure if we want to be this aggressive with trimming stuff
-        trimPersistentData(sequenceNumber);
+        context.getSnapshotManager().commit(persistence(), sequenceNumber);
     }
 
     /**
@@ -706,8 +722,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     protected abstract void onStateChanged();
 
-    protected abstract DataPersistenceProvider persistence();
-
     /**
      * Notifier Actor for this RaftActor to notify when a role change happens
      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
@@ -716,17 +730,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
-    private void trimPersistentData(long sequenceNumber) {
-        // Trim akka snapshots
-        // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
-        // For now guessing that it is ANDed.
-        persistence().deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
-
-        // Trim akka journal
-        persistence().deleteMessages(sequenceNumber);
-    }
-
     private String getLeaderAddress(){
         if(isLeader()){
             return getSelf().path().toString();
@@ -747,67 +750,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
-        // create a snapshot object from the state provided and save it
-        // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
-        Snapshot sn = Snapshot.create(snapshotBytes,
-            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
-            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
-            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
-        persistence().saveSnapshot(sn);
-
-        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
-        long dataThreshold = getTotalMemory() *
-                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-        if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
-                        persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
-                        captureSnapshot.getLastAppliedIndex());
-            }
-
-            // if memory is less, clear the log based on lastApplied.
-            // this could/should only happen if one of the followers is down
-            // as normally we keep removing from the log when its replicated to all.
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
-                    captureSnapshot.getLastAppliedTerm());
-
-            // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
-            // install snapshot to a follower.
-            if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
-                getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-            }
-        } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
-            // clear the log based on replicatedToAllIndex
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
-                    captureSnapshot.getReplicatedToAllTerm());
-
-            getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-        } else {
-            // The replicatedToAllIndex was not found in the log
-            // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
-            // In this scenario we may need to save the snapshot to the akka persistence
-            // snapshot for recovery but we do not need to do the replicated log trimming.
-            context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
-                    replicatedLog.getSnapshotTerm());
-        }
-
-
-        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
-            "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm());
-
-        if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
-            // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
-                    ByteString.copyFrom(snapshotBytes)));
-        }
-
-        captureSnapshot = null;
-        context.setSnapshotCaptureInitiated(false);
+        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
     }
 
     protected long getTotalMemory() {
@@ -819,9 +762,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
         private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0;
+        private long dataSizeSinceLastSnapshot = 0L;
+
 
         public ReplicatedLogImpl(Snapshot snapshot) {
             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
@@ -887,9 +830,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex() + 1;
 
-                        if(!hasFollowers()) {
+                        if (!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
                             // due to this the journalSize will never become anything close to the
                             // snapshot batch count. In fact will mostly be 1.
@@ -903,51 +845,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // as if we were maintaining a real snapshot
                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
                         }
-
+                        long journalSize = replicatedLogEntry.getIndex() + 1;
                         long dataThreshold = getTotalMemory() *
-                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        // when a snaphsot is being taken, captureSnapshot != null
-                        if (!context.isSnapshotCaptureInitiated() &&
-                                ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
-                                        dataSizeForCheck > dataThreshold)) {
-
-                            dataSizeSinceLastSnapshot = 0;
+                                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
-                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
-                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+                        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                                || dataSizeForCheck > dataThreshold)) {
 
-                            long lastAppliedIndex = -1;
-                            long lastAppliedTerm = -1;
-
-                            ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
-                            if (!hasFollowers()) {
-                                lastAppliedIndex = replicatedLogEntry.getIndex();
-                                lastAppliedTerm = replicatedLogEntry.getTerm();
-                            } else if (lastAppliedEntry != null) {
-                                lastAppliedIndex = lastAppliedEntry.getIndex();
-                                lastAppliedTerm = lastAppliedEntry.getTerm();
-                            }
+                            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                                    currentBehavior.getReplicatedToAllIndex());
 
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
-                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
-                                        persistenceId(), context.getLastApplied());
-                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
-                                        lastAppliedIndex);
-                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
-                                        lastAppliedTerm);
+                            if(started){
+                                dataSizeSinceLastSnapshot = 0;
                             }
 
-                            // send a CaptureSnapshot to self to make the expensive operation async.
-                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
-                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
-                                null);
-                            context.setSnapshotCaptureInitiated(true);
                         }
+
                         if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
@@ -971,46 +884,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-
-    private class ElectionTermImpl implements ElectionTerm {
-        /**
-         * Identifier of the actor whose election term information this is
-         */
-        private long currentTerm = 0;
-        private String votedFor = null;
-
-        @Override
-        public long getCurrentTerm() {
-            return currentTerm;
-        }
-
-        @Override
-        public String getVotedFor() {
-            return votedFor;
-        }
-
-        @Override public void update(long currentTerm, String votedFor) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
-            }
-            this.currentTerm = currentTerm;
-            this.votedFor = votedFor;
-        }
-
-        @Override
-        public void updateAndPersist(long currentTerm, String votedFor){
-            update(currentTerm, votedFor);
-            // FIXME : Maybe first persist then update the state
-            persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
-
-                @Override public void apply(UpdateElectionTerm param)
-                    throws Exception {
-
-                }
-            });
-        }
-    }
-
     static class UpdateElectionTerm implements Serializable {
         private static final long serialVersionUID = 1L;
         private final long currentTerm;
@@ -1030,28 +903,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
-
-        public NonPersistentRaftDataProvider(){
-
-        }
+    private class CreateSnapshotProcedure implements Procedure<Void> {
 
-        /**
-         * The way snapshotting works is,
-         * <ol>
-         * <li> RaftActor calls createSnapshot on the Shard
-         * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
-         * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
-         * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
-         * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
-         * in SaveSnapshotSuccess.
-         * </ol>
-         * @param o
-         */
         @Override
-        public void saveSnapshot(Object o) {
-            // Make saving Snapshot successful
-            commitSnapshot(-1L);
+        public void apply(Void aVoid) throws Exception {
+            createSnapshot();
         }
     }