Bug 2933: Implemented DataTreeChangeService
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 19804111ec1fcf42dc241c3448e67d07c108eb62..a13b6ff95ab356550bf45bff63665f61dc1e3aec 100644 (file)
@@ -29,6 +29,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
@@ -96,12 +99,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final Procedure<ApplyJournalEntries> APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK =
-            new Procedure<ApplyJournalEntries>() {
-                @Override
-                public void apply(ApplyJournalEntries param) throws Exception {
-                }
-            };
     private static final String COMMIT_SNAPSHOT = "commit_snapshot";
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -118,6 +115,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     private final RaftActorContextImpl context;
 
+    private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+
     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
 
     /**
@@ -139,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
          Optional<ConfigParams> configParams) {
 
         context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(), id, new ElectionTermImpl(),
+            this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
             -1, -1, replicatedLog, peerAddresses,
             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
             LOG);
@@ -339,7 +338,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
             }
 
-            persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK);
+            persistence().persist(applyEntries, NoopProcedure.instance());
 
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
@@ -587,6 +586,41 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setConfigParams(configParams);
     }
 
+    public final DataPersistenceProvider persistence() {
+        return delegatingPersistenceProvider.getDelegate();
+    }
+
+    public void setPersistence(DataPersistenceProvider provider) {
+        delegatingPersistenceProvider.setDelegate(provider);
+    }
+
+    protected void setPersistence(boolean persistent) {
+        if(persistent) {
+            setPersistence(new PersistentDataProvider(this));
+        } else {
+            setPersistence(new NonPersistentDataProvider() {
+                /**
+                 * The way snapshotting works is,
+                 * <ol>
+                 * <li> RaftActor calls createSnapshot on the Shard
+                 * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+                 * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
+                 * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
+                 * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
+                 * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
+                 * </ol>
+                 */
+                @Override
+                public void saveSnapshot(Object o) {
+                    // Make saving Snapshot successful
+                    // Committing the snapshot here would end up calling commit in the creating state which would
+                    // be a state violation. That's why now we send a message to commit the snapshot.
+                    self().tell(COMMIT_SNAPSHOT, self());
+                }
+            });
+        }
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -688,8 +722,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     protected abstract void onStateChanged();
 
-    protected abstract DataPersistenceProvider persistence();
-
     /**
      * Notifier Actor for this RaftActor to notify when a role change happens
      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
@@ -698,17 +730,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
-    private void trimPersistentData(long sequenceNumber) {
-        // Trim akka snapshots
-        // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
-        // For now guessing that it is ANDed.
-        persistence().deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
-
-        // Trim akka journal
-        persistence().deleteMessages(sequenceNumber);
-    }
-
     private String getLeaderAddress(){
         if(isLeader()){
             return getSelf().path().toString();
@@ -863,46 +884,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-
-    private class ElectionTermImpl implements ElectionTerm {
-        /**
-         * Identifier of the actor whose election term information this is
-         */
-        private long currentTerm = 0;
-        private String votedFor = null;
-
-        @Override
-        public long getCurrentTerm() {
-            return currentTerm;
-        }
-
-        @Override
-        public String getVotedFor() {
-            return votedFor;
-        }
-
-        @Override public void update(long currentTerm, String votedFor) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
-            }
-            this.currentTerm = currentTerm;
-            this.votedFor = votedFor;
-        }
-
-        @Override
-        public void updateAndPersist(long currentTerm, String votedFor){
-            update(currentTerm, votedFor);
-            // FIXME : Maybe first persist then update the state
-            persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
-
-                @Override public void apply(UpdateElectionTerm param)
-                    throws Exception {
-
-                }
-            });
-        }
-    }
-
     static class UpdateElectionTerm implements Serializable {
         private static final long serialVersionUID = 1L;
         private final long currentTerm;
@@ -922,34 +903,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
-
-        public NonPersistentRaftDataProvider(){
-
-        }
-
-        /**
-         * The way snapshotting works is,
-         * <ol>
-         * <li> RaftActor calls createSnapshot on the Shard
-         * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
-         * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
-         * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
-         * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
-         * in SaveSnapshotSuccess.
-         * </ol>
-         * @param o
-         */
-        @Override
-        public void saveSnapshot(Object o) {
-            // Make saving Snapshot successful
-            // Committing the snapshot here would end up calling commit in the creating state which would
-            // be a state violation. That's why now we send a message to commit the snapshot.
-            self().tell(COMMIT_SNAPSHOT, self());
-        }
-    }
-
-
     private class CreateSnapshotProcedure implements Procedure<Void> {
 
         @Override