Merge "Fixed typo in SnapshotBackedWriteTransaction class"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
index 30c6ac4e7fc94bbc45b67519075246967671f182..0a2b66c2d0112a35e4173d11ac8c7e06de1f73c2 100644 (file)
@@ -11,12 +11,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
@@ -24,7 +26,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
@@ -36,6 +40,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
@@ -52,21 +58,19 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     private final ListeningExecutorService executor;
     private final String name;
     private final AtomicLong txCounter = new AtomicLong(0);
+    private final ListenerTree listenerTree;
+    private final AtomicReference<DataAndMetadataSnapshot> snapshot;
 
-    private DataAndMetadataSnapshot snapshot;
     private ModificationApplyOperation operationTree;
-    private final ListenerRegistrationNode listenerTree;
-
-
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
         this.name = Preconditions.checkNotNull(name);
         this.executor = Preconditions.checkNotNull(executor);
+        this.listenerTree = ListenerTree.create();
+        this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
         this.operationTree = new AlwaysFailOperation();
-        this.snapshot = DataAndMetadataSnapshot.createEmpty();
-        this.listenerTree = ListenerRegistrationNode.createRoot();
     }
 
     @Override
@@ -76,17 +80,17 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
@@ -98,37 +102,44 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     @Override
     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
-        LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
-        ListenerRegistrationNode listenerNode = listenerTree;
-        for(PathArgument arg :path.getPath()) {
-            listenerNode = listenerNode.ensureChild(arg);
-        }
-        synchronized (listener) {
-            notifyInitialState(path, listener);
-        }
-        return listenerNode.registerDataChangeListener(path,listener, scope);
-    }
 
-    private void notifyInitialState(final InstanceIdentifier path,
-            final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
-        Optional<StoreMetadataNode> currentState = snapshot.read(path);
-        try {
+        /*
+         * Make sure commit is not occurring right now. Listener has to be registered and its
+         * state capture enqueued at a consistent point.
+         *
+         * FIXME: improve this to read-write lock, such that multiple listener registrations
+         *        can occur simultaneously
+         */
+        final DataChangeListenerRegistration<L> reg;
+        synchronized (this) {
+            LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
+
+            reg = listenerTree.registerDataChangeListener(path, listener, scope);
+
+            Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
             if (currentState.isPresent()) {
-                NormalizedNode<?, ?> data = currentState.get().getData();
-                listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+                final NormalizedNode<?, ?> data = currentState.get().getData();
+
+                final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
                         .setAfter(data) //
                         .addCreated(path, data) //
-                        .build() //
-                );
+                        .build();
+                executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
             }
-        } catch (Exception e) {
-            LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
         }
 
+        return new AbstractListenerRegistration<L>(listener) {
+            @Override
+            protected void removeRegistration() {
+                synchronized (InMemoryDOMDataStore.this) {
+                    reg.close();
+                }
+            }
+        };
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
-            final SnaphostBackedWriteTransaction writeTx) {
+            final SnapshotBackedWriteTransaction writeTx) {
         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
         return new ThreePhaseCommitImpl(writeTx);
     }
@@ -137,44 +148,73 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
-            final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
+    private void commit(final DataAndMetadataSnapshot currentSnapshot,
+            final StoreMetadataNode newDataTree, final ResolveDataChangeEventsTask listenerResolver) {
         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
         }
-        checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
-        snapshot = DataAndMetadataSnapshot.builder() //
+
+        final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
                 .setMetadataTree(newDataTree) //
                 .setSchemaContext(schemaContext) //
                 .build();
 
-        for(ChangeListenerNotifyTask task : listenerTasks) {
-            executor.submit(task);
-        }
+        /*
+         * The commit has to occur atomically with regard to listener registrations.
+         */
+        synchronized (this) {
+            final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+            checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
 
+            for (ChangeListenerNotifyTask task : listenerResolver.call()) {
+                LOG.trace("Scheduling invocation of listeners: {}",task);
+                executor.submit(task);
+            }
+        }
     }
 
-    private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
-
-        private DataAndMetadataSnapshot stableSnapshot;
+    private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
         private final Object identifier;
 
-        public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
+        protected AbstractDOMStoreTransaction(final Object identifier) {
             this.identifier = identifier;
-            this.stableSnapshot = snapshot;
-            LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
-
         }
 
         @Override
-        public Object getIdentifier() {
+        public final Object getIdentifier() {
             return identifier;
         }
 
+        @Override
+        public final String toString() {
+            return addToStringAttributes(Objects.toStringHelper(this)).toString();
+        }
+
+        /**
+         * Add class-specific toString attributes.
+         *
+         * @param toStringHelper ToStringHelper instance
+         * @return ToStringHelper instance which was passed in
+         */
+        protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+            return toStringHelper.add("id", identifier);
+        }
+    }
+
+    private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements DOMStoreReadTransaction {
+        private DataAndMetadataSnapshot stableSnapshot;
+
+        public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
+            super(identifier);
+            this.stableSnapshot = Preconditions.checkNotNull(snapshot);
+            LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree().getSubtreeVersion());
+        }
+
         @Override
         public void close() {
+            LOG.debug("Store transaction: {} : Closed", getIdentifier());
             stableSnapshot = null;
         }
 
@@ -184,37 +224,24 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             checkState(stableSnapshot != null, "Transaction is closed");
             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
         }
-
-        @Override
-        public String toString() {
-            return "SnapshotBackedReadTransaction [id =" + identifier + "]";
-        }
-
     }
 
-    private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
-
+    private static class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction {
         private MutableDataTree mutableTree;
-        private final Object identifier;
         private InMemoryDOMDataStore store;
-
         private boolean ready = false;
 
-        public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
+        public SnapshotBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
-            this.identifier = identifier;
+            super(identifier);
             mutableTree = MutableDataTree.from(snapshot, applyOper);
             this.store = store;
             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
         }
 
-        @Override
-        public Object getIdentifier() {
-            return identifier;
-        }
-
         @Override
         public void close() {
+            LOG.debug("Store transaction: {} : Closed", getIdentifier());
             this.mutableTree = null;
             this.store = null;
         }
@@ -222,26 +249,52 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
             checkNotReady();
-            mutableTree.write(path, data);
+            try {
+                LOG.trace("Tx: {} Write: {}:{}",getIdentifier(),path,data);
+                mutableTree.write(path, data);
+              // FIXME: Add checked exception
+            } catch (Exception e) {
+                LOG.error("Tx: {}, failed to write {}:{} in {}",getIdentifier(),path,data,mutableTree,e);
+            }
+        }
+
+        @Override
+        public void merge(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+            checkNotReady();
+            try {
+                LOG.trace("Tx: {} Merge: {}:{}",getIdentifier(),path,data);
+                mutableTree.merge(path, data);
+              // FIXME: Add checked exception
+            } catch (Exception e) {
+                LOG.error("Tx: {}, failed to write {}:{} in {}",getIdentifier(),path,data,mutableTree,e);
+            }
         }
 
         @Override
         public void delete(final InstanceIdentifier path) {
             checkNotReady();
-            mutableTree.delete(path);
+            try {
+                LOG.trace("Tx: {} Delete: {}",getIdentifier(),path);
+                mutableTree.delete(path);
+             // FIXME: Add checked exception
+            } catch (Exception e) {
+                LOG.error("Tx: {}, failed to delete {} in {}",getIdentifier(),path,mutableTree,e);
+            }
         }
 
-        protected boolean isReady() {
+        protected final boolean isReady() {
             return ready;
         }
 
-        protected void checkNotReady() {
-            checkState(!ready, "Transaction is ready. No further modifications allowed.");
+        protected final void checkNotReady() {
+            checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
         }
 
         @Override
         public synchronized DOMStoreThreePhaseCommitCohort ready() {
+            checkState(!ready, "Transaction %s is already ready.", getIdentifier());
             ready = true;
+
             LOG.debug("Store transaction: {} : Ready", getIdentifier());
             mutableTree.seal();
             return store.submit(this);
@@ -252,13 +305,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         }
 
         @Override
-        public String toString() {
-            return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
+        protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+            return toStringHelper.add("ready", isReady());
         }
-
     }
 
-    private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
+    private static class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements
             DOMStoreReadWriteTransaction {
 
         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
@@ -268,33 +320,33 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
-            return Futures.immediateFuture(getMutatedView().read(path));
-        }
-
-        @Override
-        public String toString() {
-            return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
+            LOG.trace("Tx: {} Read: {}",getIdentifier(),path);
+            try {
+                return Futures.immediateFuture(getMutatedView().read(path));
+            } catch (Exception e) {
+                LOG.error("Tx: {} Failed Read of {}",getIdentifier(),path,e);
+                throw e;
+            }
         }
-
     }
 
     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
 
-        private final SnaphostBackedWriteTransaction transaction;
+        private final SnapshotBackedWriteTransaction transaction;
         private final NodeModification modification;
 
         private DataAndMetadataSnapshot storeSnapshot;
         private Optional<StoreMetadataNode> proposedSubtree;
-        private Iterable<ChangeListenerNotifyTask> listenerTasks;
+        private ResolveDataChangeEventsTask listenerResolver;
 
-        public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
+        public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
             this.transaction = writeTransaction;
             this.modification = transaction.getMutatedView().getRootModification();
         }
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            final DataAndMetadataSnapshot snapshotCapture = snapshot;
+            final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
             final ModificationApplyOperation snapshotOperation = operationTree;
 
             return executor.submit(new Callable<Boolean>() {
@@ -311,7 +363,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            storeSnapshot = snapshot;
+            storeSnapshot = snapshot.get();
             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
                 return Futures.immediateFuture(null);
             }
@@ -326,13 +378,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
                             increase(metadataTree.getSubtreeVersion()));
 
-                    listenerTasks = DataChangeEventResolver.create() //
+                    listenerResolver = ResolveDataChangeEventsTask.create() //
                             .setRootPath(PUBLIC_ROOT_PATH) //
                             .setBeforeRoot(Optional.of(metadataTree)) //
                             .setAfterRoot(proposedSubtree) //
                             .setModificationRoot(modification) //
-                            .setListenerRoot(listenerTree) //
-                            .resolve();
+                            .setListenerRoot(listenerTree);
 
                     return null;
                 }
@@ -355,7 +406,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             checkState(proposedSubtree != null,"Proposed subtree must be computed");
             checkState(storeSnapshot != null,"Proposed subtree must be computed");
             // return ImmediateFuture<>;
-            InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
+            InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerResolver);
             return Futures.<Void> immediateFuture(null);
         }