Merge "Fix bug 666 - Neutron Subnet Creation Error"
authorMadhu Venugopal <mavenugo@gmail.com>
Mon, 7 Apr 2014 22:38:36 +0000 (22:38 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 7 Apr 2014 22:38:36 +0000 (22:38 +0000)
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DiscoveryService.java

index 72ae24e007ba847ba882c291b24092073a75aff7..306dc2390b9a4381067baa62506112ad42d071c7 100644 (file)
@@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory;
 class ChangeListenerNotifyTask implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
-    private final Iterable<DataChangeListenerRegistration<?>> listeners;
+    private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
     private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event;
 
-    public ChangeListenerNotifyTask(final Iterable<DataChangeListenerRegistration<?>> listeners,
+    public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
             final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event) {
         this.listeners = listeners;
         this.event = event;
index a032c798c62d8fbe0151ab5406a23dab54b0af02..0948f6d3e56f9bd285240c39dc9a6b2f80039b6f 100644 (file)
@@ -184,6 +184,9 @@ public class DataChangeEventResolver {
                 subtree.merge(resolveSubtreeChangeEvent(childPath, childListen, childMod, childBefore.get(),
                         childAfter.get()));
                 break;
+            case UNMODIFIED:
+                // no-op
+                break;
             }
         }
         DOMImmutableDataChangeEvent oneChangeEvent = one.build();
index 0cd00cad7e6ec9609a03bac044f1ac613edf32c2..2091913f24949106aae128e07083a4354b992b70 100644 (file)
@@ -11,12 +11,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
@@ -37,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -51,41 +55,39 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     private final ListeningExecutorService executor;
     private final String name;
     private final AtomicLong txCounter = new AtomicLong(0);
-
-    private DataAndMetadataSnapshot snapshot;
-    private ModificationApplyOperation operationTree;
     private final ListenerRegistrationNode listenerTree;
+    private final AtomicReference<DataAndMetadataSnapshot> snapshot;
 
-
+    private ModificationApplyOperation operationTree;
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
-        this.executor = executor;
-        this.name = name;
-        this.operationTree = new AllwaysFailOperation();
-        this.snapshot = DataAndMetadataSnapshot.createEmpty();
+        this.name = Preconditions.checkNotNull(name);
+        this.executor = Preconditions.checkNotNull(executor);
         this.listenerTree = ListenerRegistrationNode.createRoot();
+        this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
+        this.operationTree = new AlwaysFailOperation();
     }
 
     @Override
-    public String getIdentifier() {
+    public final String getIdentifier() {
         return name;
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+        return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
     }
 
     @Override
@@ -99,31 +101,34 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
         ListenerRegistrationNode listenerNode = listenerTree;
-        for(PathArgument arg :path.getPath()) {
+        for(PathArgument arg : path.getPath()) {
             listenerNode = listenerNode.ensureChild(arg);
         }
-        synchronized (listener) {
-            notifyInitialState(path, listener);
-        }
-        return listenerNode.registerDataChangeListener(path,listener, scope);
-    }
 
-    private void notifyInitialState(final InstanceIdentifier path,
-            final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
-        Optional<StoreMetadataNode> currentState = snapshot.read(path);
-        try {
+        /*
+         * Make sure commit is not occurring right now. Listener has to be registered and its
+         * state capture enqueued at a consistent point.
+         *
+         * FIXME: improve this to read-write lock, such that multiple listener registrations
+         *        can occur simultaneously
+         */
+        final DataChangeListenerRegistration<L> reg;
+        synchronized (this) {
+            reg = listenerNode.registerDataChangeListener(path, listener, scope);
+
+            Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
             if (currentState.isPresent()) {
-                NormalizedNode<?, ?> data = currentState.get().getData();
-                listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+                final NormalizedNode<?, ?> data = currentState.get().getData();
+
+                final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
                         .setAfter(data) //
                         .addCreated(path, data) //
-                        .build() //
-                );
+                        .build();
+                executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
             }
-        } catch (Exception e) {
-            LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
         }
 
+        return reg;
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
@@ -136,23 +141,30 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+    private void commit(final DataAndMetadataSnapshot currentSnapshot,
             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
 
         if(LOG.isTraceEnabled()) {
             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
         }
-        checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
-        snapshot = DataAndMetadataSnapshot.builder() //
+
+        final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
                 .setMetadataTree(newDataTree) //
                 .setSchemaContext(schemaContext) //
                 .build();
 
-        for(ChangeListenerNotifyTask task : listenerTasks) {
-            executor.submit(task);
-        }
+        /*
+         * The commit has to occur atomically with regard to listener registrations.
+         */
+        synchronized (this) {
+            final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+            checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
 
+            for (ChangeListenerNotifyTask task : listenerTasks) {
+                executor.submit(task);
+            }
+        }
     }
 
     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
@@ -293,7 +305,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            final DataAndMetadataSnapshot snapshotCapture = snapshot;
+            final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
             final ModificationApplyOperation snapshotOperation = operationTree;
 
             return executor.submit(new Callable<Boolean>() {
@@ -310,7 +322,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            storeSnapshot = snapshot;
+            storeSnapshot = snapshot.get();
             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
                 return Futures.immediateFuture(null);
             }
@@ -360,7 +372,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     }
 
-    private class AllwaysFailOperation implements ModificationApplyOperation {
+    private static final class AlwaysFailOperation implements ModificationApplyOperation {
 
         @Override
         public Optional<StoreMetadataNode> apply(final NodeModification modification,
index e48438d6fa63aa1888fe8ab0df580ca7881d5650..2528d383b95a0c9f6534ad0f01627f56dbc26c5c 100644 (file)
@@ -9,7 +9,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -20,7 +19,7 @@ import com.google.common.base.Optional;
 
 public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
 
-    private final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
 
     private final ListenerRegistrationNode parent;
     private final Map<PathArgument, ListenerRegistrationNode> children;
@@ -49,6 +48,7 @@ public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrat
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
+        // FIXME: this is not thread-safe and races with listener (un)registration!
         return (Collection) listeners;
     }
 
@@ -75,7 +75,7 @@ public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrat
      * @param scope Scope of triggering event.
      * @return
      */
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
+    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
             final L listener, final DataChangeScope scope) {
 
         DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);
index 548bfb1f9fb26eb93ab4ca5bcd456d9bdc4082c3..fc668b3f4d0f1a1f01bb3a9b096b6bc7440515c0 100644 (file)
@@ -945,7 +945,7 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
         if (!src.getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
             if (type == UpdateType.ADDED) {
                 edgeMap.put(dst, edge);
-            } else {
+            } else if (type == UpdateType.REMOVED) {
                 edgeMap.remove(dst);
             }
         } else {
@@ -954,7 +954,7 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
              */
             if (type == UpdateType.ADDED) {
                 prodMap.put(dst, edge);
-            } else {
+            } else if (type == UpdateType.REMOVED) {
                 prodMap.remove(dst);
             }
         }