Merge "BUG-2673: make CDS implement DOMDataTreeChangeListener"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 8e00a1389ca6a057bef39292ffcc0b04958be794..6868cc15cd8b21dd899d3b066142fc7fbeefce23 100644 (file)
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
@@ -57,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
@@ -115,8 +115,6 @@ public class Shard extends RaftActor {
 
     private DatastoreContext datastoreContext;
 
-    private DataPersistenceProvider dataPersistenceProvider;
-
     private SchemaContext schemaContext;
 
     private int createSnapshotTransactionCounter;
@@ -144,6 +142,8 @@ public class Shard extends RaftActor {
 
     private final String txnDispatcherPath;
 
+    private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
+
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
@@ -151,18 +151,17 @@ public class Shard extends RaftActor {
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
-        this.dataPersistenceProvider = (datastoreContext.isPersistent())
-                ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
         this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
                 .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
 
+        setPersistence(datastoreContext.isPersistent());
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
 
-        if(schemaContext != null) {
+        if (schemaContext != null) {
             store.onGlobalContextUpdated(schemaContext);
         }
 
@@ -276,6 +275,8 @@ public class Shard extends RaftActor {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
                 registerChangeListener((RegisterChangeListener) message);
+            } else if (message instanceof RegisterDataTreeChangeListener) {
+                treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
             } else if (message instanceof UpdateSchemaContext) {
                 updateSchemaContext((UpdateSchemaContext) message);
             } else if (message instanceof PeerAddressResolved) {
@@ -311,12 +312,10 @@ public class Shard extends RaftActor {
 
         setTransactionCommitTimeout();
 
-        if(datastoreContext.isPersistent() &&
-                dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
-            dataPersistenceProvider = new PersistentDataProvider();
-        } else if(!datastoreContext.isPersistent() &&
-                dataPersistenceProvider instanceof PersistentDataProvider) {
-            dataPersistenceProvider = new NonPersistentRaftDataProvider();
+        if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+            setPersistence(true);
+        } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+            setPersistence(false);
         }
 
         updateConfigParams(datastoreContext.getShardRaftConfig());
@@ -832,6 +831,8 @@ public class Shard extends RaftActor {
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
+        treeChangeSupport.onLeadershipChange(isLeader);
+
         for (ActorSelection dataChangeListener : dataChangeListeners) {
             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
         }
@@ -859,19 +860,10 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected DataPersistenceProvider persistence() {
-        return dataPersistenceProvider;
-    }
-
-    @Override public String persistenceId() {
+    public String persistenceId() {
         return this.name;
     }
 
-    @VisibleForTesting
-    DataPersistenceProvider getDataPersistenceProvider() {
-        return dataPersistenceProvider;
-    }
-
     @VisibleForTesting
     ShardCommitCoordinator getCommitCoordinator() {
         return commitCoordinator;