BUG-509: make sure datastore commits are visible 13/5913/2
authorRobert Varga <rovarga@cisco.com>
Fri, 4 Apr 2014 17:50:57 +0000 (19:50 +0200)
committerRobert Varga <rovarga@cisco.com>
Sun, 6 Apr 2014 16:24:21 +0000 (18:24 +0200)
This wraps the datastore root in an AtomicReference, which makes sure
started transactions see the latest published commit. This will make
the datastore handling more robust under tight conditions.

Also uncovers the fact that we are invoking user code under lock, which
we fix by reusing the executor used by the commit machinery.

Finally it uncovers thread-unsafe listener list manipuation. This will
need to be addressed in a follow-up patch.

Change-Id: Ic7efd266ef680701c1f0944ee675122d8527568b
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java

index 72ae24e007ba847ba882c291b24092073a75aff7..306dc2390b9a4381067baa62506112ad42d071c7 100644 (file)
@@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory;
 class ChangeListenerNotifyTask implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
 class ChangeListenerNotifyTask implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
-    private final Iterable<DataChangeListenerRegistration<?>> listeners;
+    private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
     private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event;
 
     private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event;
 
-    public ChangeListenerNotifyTask(final Iterable<DataChangeListenerRegistration<?>> listeners,
+    public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
             final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event) {
         this.listeners = listeners;
         this.event = event;
             final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event) {
         this.listeners = listeners;
         this.event = event;
index 30c6ac4e7fc94bbc45b67519075246967671f182..2091913f24949106aae128e07083a4354b992b70 100644 (file)
@@ -11,12 +11,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
@@ -52,21 +55,19 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     private final ListeningExecutorService executor;
     private final String name;
     private final AtomicLong txCounter = new AtomicLong(0);
     private final ListeningExecutorService executor;
     private final String name;
     private final AtomicLong txCounter = new AtomicLong(0);
-
-    private DataAndMetadataSnapshot snapshot;
-    private ModificationApplyOperation operationTree;
     private final ListenerRegistrationNode listenerTree;
     private final ListenerRegistrationNode listenerTree;
+    private final AtomicReference<DataAndMetadataSnapshot> snapshot;
 
 
-
+    private ModificationApplyOperation operationTree;
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
         this.name = Preconditions.checkNotNull(name);
         this.executor = Preconditions.checkNotNull(executor);
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
         this.name = Preconditions.checkNotNull(name);
         this.executor = Preconditions.checkNotNull(executor);
-        this.operationTree = new AlwaysFailOperation();
-        this.snapshot = DataAndMetadataSnapshot.createEmpty();
         this.listenerTree = ListenerRegistrationNode.createRoot();
         this.listenerTree = ListenerRegistrationNode.createRoot();
+        this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
+        this.operationTree = new AlwaysFailOperation();
     }
 
     @Override
     }
 
     @Override
@@ -76,17 +77,17 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
     }
 
     @Override
@@ -100,31 +101,34 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
         ListenerRegistrationNode listenerNode = listenerTree;
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
         ListenerRegistrationNode listenerNode = listenerTree;
-        for(PathArgument arg :path.getPath()) {
+        for(PathArgument arg : path.getPath()) {
             listenerNode = listenerNode.ensureChild(arg);
         }
             listenerNode = listenerNode.ensureChild(arg);
         }
-        synchronized (listener) {
-            notifyInitialState(path, listener);
-        }
-        return listenerNode.registerDataChangeListener(path,listener, scope);
-    }
 
 
-    private void notifyInitialState(final InstanceIdentifier path,
-            final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
-        Optional<StoreMetadataNode> currentState = snapshot.read(path);
-        try {
+        /*
+         * Make sure commit is not occurring right now. Listener has to be registered and its
+         * state capture enqueued at a consistent point.
+         *
+         * FIXME: improve this to read-write lock, such that multiple listener registrations
+         *        can occur simultaneously
+         */
+        final DataChangeListenerRegistration<L> reg;
+        synchronized (this) {
+            reg = listenerNode.registerDataChangeListener(path, listener, scope);
+
+            Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
             if (currentState.isPresent()) {
             if (currentState.isPresent()) {
-                NormalizedNode<?, ?> data = currentState.get().getData();
-                listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+                final NormalizedNode<?, ?> data = currentState.get().getData();
+
+                final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
                         .setAfter(data) //
                         .addCreated(path, data) //
                         .setAfter(data) //
                         .addCreated(path, data) //
-                        .build() //
-                );
+                        .build();
+                executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
             }
             }
-        } catch (Exception e) {
-            LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
         }
 
         }
 
+        return reg;
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
@@ -137,23 +141,30 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         return name + "-" + txCounter.getAndIncrement();
     }
 
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+    private void commit(final DataAndMetadataSnapshot currentSnapshot,
             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
         }
             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
         }
-        checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
-        snapshot = DataAndMetadataSnapshot.builder() //
+
+        final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
                 .setMetadataTree(newDataTree) //
                 .setSchemaContext(schemaContext) //
                 .build();
 
                 .setMetadataTree(newDataTree) //
                 .setSchemaContext(schemaContext) //
                 .build();
 
-        for(ChangeListenerNotifyTask task : listenerTasks) {
-            executor.submit(task);
-        }
+        /*
+         * The commit has to occur atomically with regard to listener registrations.
+         */
+        synchronized (this) {
+            final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+            checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
 
 
+            for (ChangeListenerNotifyTask task : listenerTasks) {
+                executor.submit(task);
+            }
+        }
     }
 
     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
     }
 
     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
@@ -294,7 +305,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            final DataAndMetadataSnapshot snapshotCapture = snapshot;
+            final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
             final ModificationApplyOperation snapshotOperation = operationTree;
 
             return executor.submit(new Callable<Boolean>() {
             final ModificationApplyOperation snapshotOperation = operationTree;
 
             return executor.submit(new Callable<Boolean>() {
@@ -311,7 +322,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Void> preCommit() {
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            storeSnapshot = snapshot;
+            storeSnapshot = snapshot.get();
             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
                 return Futures.immediateFuture(null);
             }
             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
                 return Futures.immediateFuture(null);
             }
index e48438d6fa63aa1888fe8ab0df580ca7881d5650..2528d383b95a0c9f6534ad0f01627f56dbc26c5c 100644 (file)
@@ -9,7 +9,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -20,7 +19,7 @@ import com.google.common.base.Optional;
 
 public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
 
 
 public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
 
-    private final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
 
     private final ListenerRegistrationNode parent;
     private final Map<PathArgument, ListenerRegistrationNode> children;
 
     private final ListenerRegistrationNode parent;
     private final Map<PathArgument, ListenerRegistrationNode> children;
@@ -49,6 +48,7 @@ public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrat
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
+        // FIXME: this is not thread-safe and races with listener (un)registration!
         return (Collection) listeners;
     }
 
         return (Collection) listeners;
     }
 
@@ -75,7 +75,7 @@ public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrat
      * @param scope Scope of triggering event.
      * @return
      */
      * @param scope Scope of triggering event.
      * @return
      */
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
+    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
             final L listener, final DataChangeScope scope) {
 
         DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);
             final L listener, final DataChangeScope scope) {
 
         DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);