Bug 499: Improved data change listener tree management
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
index 4d5b8ee906a07640f5be07bfb8a3173b57a66bb9..0cd00cad7e6ec9609a03bac044f1ac613edf32c2 100644 (file)
@@ -16,6 +16,8 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -43,22 +45,27 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+    private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
 
 
+
+    private final ListeningExecutorService executor;
+    private final String name;
     private final AtomicLong txCounter = new AtomicLong(0);
 
     private DataAndMetadataSnapshot snapshot;
     private final AtomicLong txCounter = new AtomicLong(0);
 
     private DataAndMetadataSnapshot snapshot;
-    private ModificationApplyOperation operation;
+    private ModificationApplyOperation operationTree;
+    private final ListenerRegistrationNode listenerTree;
+
 
 
-    private final ListeningExecutorService executor;
-    private final String name;
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
         this.executor = executor;
         this.name = name;
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
         this.executor = executor;
         this.name = name;
-        this.operation = new AllwaysFailOperation();
+        this.operationTree = new AllwaysFailOperation();
         this.snapshot = DataAndMetadataSnapshot.createEmpty();
         this.snapshot = DataAndMetadataSnapshot.createEmpty();
+        this.listenerTree = ListenerRegistrationNode.createRoot();
     }
 
     @Override
     }
 
     @Override
@@ -73,35 +80,81 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operation);
+        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operation);
+        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
     }
 
     @Override
     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
     }
 
     @Override
     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
-        operation = SchemaAwareApplyOperationRoot.from(ctx);
+        operationTree = SchemaAwareApplyOperationRoot.from(ctx);
         schemaContext = ctx;
     }
 
     @Override
     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
         schemaContext = ctx;
     }
 
     @Override
     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
-        return null;
+        LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
+        ListenerRegistrationNode listenerNode = listenerTree;
+        for(PathArgument arg :path.getPath()) {
+            listenerNode = listenerNode.ensureChild(arg);
+        }
+        synchronized (listener) {
+            notifyInitialState(path, listener);
+        }
+        return listenerNode.registerDataChangeListener(path,listener, scope);
+    }
+
+    private void notifyInitialState(final InstanceIdentifier path,
+            final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
+        Optional<StoreMetadataNode> currentState = snapshot.read(path);
+        try {
+            if (currentState.isPresent()) {
+                NormalizedNode<?, ?> data = currentState.get().getData();
+                listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+                        .setAfter(data) //
+                        .addCreated(path, data) //
+                        .build() //
+                );
+            }
+        } catch (Exception e) {
+            LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
+        }
+
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
-            final SnaphostBackedWriteTransaction snaphostBackedWriteTransaction) {
-        return new ThreePhaseCommitImpl(snaphostBackedWriteTransaction);
+            final SnaphostBackedWriteTransaction writeTx) {
+        LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
+        return new ThreePhaseCommitImpl(writeTx);
     }
 
     private Object nextIdentifier() {
         return name + "-" + txCounter.getAndIncrement();
     }
 
     }
 
     private Object nextIdentifier() {
         return name + "-" + txCounter.getAndIncrement();
     }
 
+    private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+            final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
+        LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
+
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
+        }
+        checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
+        snapshot = DataAndMetadataSnapshot.builder() //
+                .setMetadataTree(newDataTree) //
+                .setSchemaContext(schemaContext) //
+                .build();
+
+        for(ChangeListenerNotifyTask task : listenerTasks) {
+            executor.submit(task);
+        }
+
+    }
+
     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
 
         private DataAndMetadataSnapshot stableSnapshot;
     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
 
         private DataAndMetadataSnapshot stableSnapshot;
@@ -110,6 +163,8 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
             this.identifier = identifier;
             this.stableSnapshot = snapshot;
         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
             this.identifier = identifier;
             this.stableSnapshot = snapshot;
+            LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
+
         }
 
         @Override
         }
 
         @Override
@@ -149,6 +204,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             this.identifier = identifier;
             mutableTree = MutableDataTree.from(snapshot, applyOper);
             this.store = store;
             this.identifier = identifier;
             mutableTree = MutableDataTree.from(snapshot, applyOper);
             this.store = store;
+            LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
         }
 
         @Override
         }
 
         @Override
@@ -185,7 +241,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public synchronized DOMStoreThreePhaseCommitCohort ready() {
             ready = true;
         @Override
         public synchronized DOMStoreThreePhaseCommitCohort ready() {
             ready = true;
-            LOG.debug("Store transaction: {} : Ready",getIdentifier());
+            LOG.debug("Store transaction: {} : Ready", getIdentifier());
             mutableTree.seal();
             return store.submit(this);
         }
             mutableTree.seal();
             return store.submit(this);
         }
@@ -228,6 +284,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         private DataAndMetadataSnapshot storeSnapshot;
         private Optional<StoreMetadataNode> proposedSubtree;
 
         private DataAndMetadataSnapshot storeSnapshot;
         private Optional<StoreMetadataNode> proposedSubtree;
+        private Iterable<ChangeListenerNotifyTask> listenerTasks;
 
         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
             this.transaction = writeTransaction;
 
         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
             this.transaction = writeTransaction;
@@ -237,14 +294,15 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public ListenableFuture<Boolean> canCommit() {
             final DataAndMetadataSnapshot snapshotCapture = snapshot;
         @Override
         public ListenableFuture<Boolean> canCommit() {
             final DataAndMetadataSnapshot snapshotCapture = snapshot;
-            final ModificationApplyOperation snapshotOperation = operation;
+            final ModificationApplyOperation snapshotOperation = operationTree;
 
             return executor.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() throws Exception {
 
             return executor.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() throws Exception {
-                    boolean applicable = snapshotOperation.isApplicable(modification, Optional.of(snapshotCapture.getMetadataTree()));
-                    LOG.debug("Store Transcation: {} : canCommit : {}",transaction.getIdentifier(),applicable);
+                    boolean applicable = snapshotOperation.isApplicable(modification,
+                            Optional.of(snapshotCapture.getMetadataTree()));
+                    LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
                     return applicable;
                 }
             });
                     return applicable;
                 }
             });
@@ -253,12 +311,28 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public ListenableFuture<Void> preCommit() {
             storeSnapshot = snapshot;
         @Override
         public ListenableFuture<Void> preCommit() {
             storeSnapshot = snapshot;
+            if(modification.getModificationType() == ModificationType.UNMODIFIED) {
+                return Futures.immediateFuture(null);
+            }
             return executor.submit(new Callable<Void>() {
 
             return executor.submit(new Callable<Void>() {
 
+
+
                 @Override
                 public Void call() throws Exception {
                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
                 @Override
                 public Void call() throws Exception {
                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
-                    proposedSubtree = operation.apply(modification, Optional.of(metadataTree),increase(metadataTree.getSubtreeVersion()));
+
+                    proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
+                            increase(metadataTree.getSubtreeVersion()));
+
+                    listenerTasks = DataChangeEventResolver.create() //
+                            .setRootPath(PUBLIC_ROOT_PATH) //
+                            .setBeforeRoot(Optional.of(metadataTree)) //
+                            .setAfterRoot(proposedSubtree) //
+                            .setModificationRoot(modification) //
+                            .setListenerRoot(listenerTree) //
+                            .resolve();
+
                     return null;
                 }
             });
                     return null;
                 }
             });
@@ -273,28 +347,24 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Void> commit() {
 
         @Override
         public ListenableFuture<Void> commit() {
-            checkState(proposedSubtree != null);
-            checkState(storeSnapshot != null);
+            if(modification.getModificationType() == ModificationType.UNMODIFIED) {
+                return Futures.immediateFuture(null);
+            }
+
+            checkState(proposedSubtree != null,"Proposed subtree must be computed");
+            checkState(storeSnapshot != null,"Proposed subtree must be computed");
             // return ImmediateFuture<>;
             // return ImmediateFuture<>;
-            InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree);
+            InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
             return Futures.<Void> immediateFuture(null);
         }
 
     }
 
             return Futures.<Void> immediateFuture(null);
         }
 
     }
 
-    private synchronized void commit(final DataAndMetadataSnapshot storeSnapshot,
-            final Optional<StoreMetadataNode> proposedSubtree) {
-        //LOG.info("Updating Store snaphot.");
-        checkState(snapshot == storeSnapshot, "Store snapshot and transaction snapshot differs");
-        snapshot = DataAndMetadataSnapshot.builder().setMetadataTree(proposedSubtree.get())
-                .setSchemaContext(schemaContext).build();
-    }
-
     private class AllwaysFailOperation implements ModificationApplyOperation {
 
         @Override
         public Optional<StoreMetadataNode> apply(final NodeModification modification,
     private class AllwaysFailOperation implements ModificationApplyOperation {
 
         @Override
         public Optional<StoreMetadataNode> apply(final NodeModification modification,
-                final Optional<StoreMetadataNode> storeMeta,final UnsignedLong subtreeVersion) {
+                final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
             throw new IllegalStateException("Schema Context is not available.");
         }
 
             throw new IllegalStateException("Schema Context is not available.");
         }