BUG-509: make sure datastore commits are visible 13/5913/2
authorRobert Varga <rovarga@cisco.com>
Fri, 4 Apr 2014 17:50:57 +0000 (19:50 +0200)
committerRobert Varga <rovarga@cisco.com>
Sun, 6 Apr 2014 16:24:21 +0000 (18:24 +0200)
This wraps the datastore root in an AtomicReference, which makes sure
started transactions see the latest published commit. This will make
the datastore handling more robust under tight conditions.

Also uncovers the fact that we are invoking user code under lock, which
we fix by reusing the executor used by the commit machinery.

Finally it uncovers thread-unsafe listener list manipuation. This will
need to be addressed in a follow-up patch.

Change-Id: Ic7efd266ef680701c1f0944ee675122d8527568b
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java

index 72ae24e..306dc23 100644 (file)
@@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory;
 class ChangeListenerNotifyTask implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
-    private final Iterable<DataChangeListenerRegistration<?>> listeners;
+    private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
     private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event;
 
-    public ChangeListenerNotifyTask(final Iterable<DataChangeListenerRegistration<?>> listeners,
+    public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
             final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event) {
         this.listeners = listeners;
         this.event = event;
index 30c6ac4..2091913 100644 (file)
@@ -11,12 +11,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
@@ -52,21 +55,19 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     private final ListeningExecutorService executor;
     private final String name;
     private final AtomicLong txCounter = new AtomicLong(0);
-
-    private DataAndMetadataSnapshot snapshot;
-    private ModificationApplyOperation operationTree;
     private final ListenerRegistrationNode listenerTree;
+    private final AtomicReference<DataAndMetadataSnapshot> snapshot;
 
-
+    private ModificationApplyOperation operationTree;
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
         this.name = Preconditions.checkNotNull(name);
         this.executor = Preconditions.checkNotNull(executor);
-        this.operationTree = new AlwaysFailOperation();
-        this.snapshot = DataAndMetadataSnapshot.createEmpty();
         this.listenerTree = ListenerRegistrationNode.createRoot();
+        this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
+        this.operationTree = new AlwaysFailOperation();
     }
 
     @Override
@@ -76,17 +77,17 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
@@ -100,31 +101,34 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
         ListenerRegistrationNode listenerNode = listenerTree;
-        for(PathArgument arg :path.getPath()) {
+        for(PathArgument arg : path.getPath()) {
             listenerNode = listenerNode.ensureChild(arg);
         }
-        synchronized (listener) {
-            notifyInitialState(path, listener);
-        }
-        return listenerNode.registerDataChangeListener(path,listener, scope);
-    }
 
-    private void notifyInitialState(final InstanceIdentifier path,
-            final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
-        Optional<StoreMetadataNode> currentState = snapshot.read(path);
-        try {
+        /*
+         * Make sure commit is not occurring right now. Listener has to be registered and its
+         * state capture enqueued at a consistent point.
+         *
+         * FIXME: improve this to read-write lock, such that multiple listener registrations
+         *        can occur simultaneously
+         */
+        final DataChangeListenerRegistration<L> reg;
+        synchronized (this) {
+            reg = listenerNode.registerDataChangeListener(path, listener, scope);
+
+            Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
             if (currentState.isPresent()) {
-                NormalizedNode<?, ?> data = currentState.get().getData();
-                listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+                final NormalizedNode<?, ?> data = currentState.get().getData();
+
+                final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
                         .setAfter(data) //
                         .addCreated(path, data) //
-                        .build() //
-                );
+                        .build();
+                executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
             }
-        } catch (Exception e) {
-            LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
         }
 
+        return reg;
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
@@ -137,23 +141,30 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+    private void commit(final DataAndMetadataSnapshot currentSnapshot,
             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
         }
-        checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
-        snapshot = DataAndMetadataSnapshot.builder() //
+
+        final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
                 .setMetadataTree(newDataTree) //
                 .setSchemaContext(schemaContext) //
                 .build();
 
-        for(ChangeListenerNotifyTask task : listenerTasks) {
-            executor.submit(task);
-        }
+        /*
+         * The commit has to occur atomically with regard to listener registrations.
+         */
+        synchronized (this) {
+            final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+            checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
 
+            for (ChangeListenerNotifyTask task : listenerTasks) {
+                executor.submit(task);
+            }
+        }
     }
 
     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
@@ -294,7 +305,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            final DataAndMetadataSnapshot snapshotCapture = snapshot;
+            final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
             final ModificationApplyOperation snapshotOperation = operationTree;
 
             return executor.submit(new Callable<Boolean>() {
@@ -311,7 +322,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            storeSnapshot = snapshot;
+            storeSnapshot = snapshot.get();
             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
                 return Futures.immediateFuture(null);
             }
index e48438d..2528d38 100644 (file)
@@ -9,7 +9,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -20,7 +19,7 @@ import com.google.common.base.Optional;
 
 public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
 
-    private final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
 
     private final ListenerRegistrationNode parent;
     private final Map<PathArgument, ListenerRegistrationNode> children;
@@ -49,6 +48,7 @@ public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrat
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
+        // FIXME: this is not thread-safe and races with listener (un)registration!
         return (Collection) listeners;
     }
 
@@ -75,7 +75,7 @@ public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrat
      * @param scope Scope of triggering event.
      * @return
      */
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
+    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
             final L listener, final DataChangeScope scope) {
 
         DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);