From 641b91bad029954a55c4373f9f9001ef109c04ae Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 4 Apr 2014 19:50:57 +0200 Subject: [PATCH 1/1] BUG-509: make sure datastore commits are visible This wraps the datastore root in an AtomicReference, which makes sure started transactions see the latest published commit. This will make the datastore handling more robust under tight conditions. Also uncovers the fact that we are invoking user code under lock, which we fix by reusing the executor used by the commit machinery. Finally it uncovers thread-unsafe listener list manipuation. This will need to be addressed in a follow-up patch. Change-Id: Ic7efd266ef680701c1f0944ee675122d8527568b Signed-off-by: Robert Varga --- .../store/impl/ChangeListenerNotifyTask.java | 4 +- .../dom/store/impl/InMemoryDOMDataStore.java | 77 +++++++++++-------- .../impl/tree/ListenerRegistrationNode.java | 6 +- 3 files changed, 49 insertions(+), 38 deletions(-) diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java index 72ae24e007..306dc2390b 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java @@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory; class ChangeListenerNotifyTask implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class); - private final Iterable> listeners; + private final Iterable> listeners; private final AsyncDataChangeEvent> event; - public ChangeListenerNotifyTask(final Iterable> listeners, + public ChangeListenerNotifyTask(final Iterable> listeners, final AsyncDataChangeEvent> event) { this.listeners = listeners; this.event = event; diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 30c6ac4e7f..2091913f24 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -11,12 +11,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType; import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification; import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode; @@ -52,21 +55,19 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch private final ListeningExecutorService executor; private final String name; private final AtomicLong txCounter = new AtomicLong(0); - - private DataAndMetadataSnapshot snapshot; - private ModificationApplyOperation operationTree; private final ListenerRegistrationNode listenerTree; + private final AtomicReference snapshot; - + private ModificationApplyOperation operationTree; private SchemaContext schemaContext; public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) { this.name = Preconditions.checkNotNull(name); this.executor = Preconditions.checkNotNull(executor); - this.operationTree = new AlwaysFailOperation(); - this.snapshot = DataAndMetadataSnapshot.createEmpty(); this.listenerTree = ListenerRegistrationNode.createRoot(); + this.snapshot = new AtomicReference(DataAndMetadataSnapshot.createEmpty()); + this.operationTree = new AlwaysFailOperation(); } @Override @@ -76,17 +77,17 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot); + return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get()); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree); } @Override @@ -100,31 +101,34 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch final InstanceIdentifier path, final L listener, final DataChangeScope scope) { LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); ListenerRegistrationNode listenerNode = listenerTree; - for(PathArgument arg :path.getPath()) { + for(PathArgument arg : path.getPath()) { listenerNode = listenerNode.ensureChild(arg); } - synchronized (listener) { - notifyInitialState(path, listener); - } - return listenerNode.registerDataChangeListener(path,listener, scope); - } - private void notifyInitialState(final InstanceIdentifier path, - final AsyncDataChangeListener> listener) { - Optional currentState = snapshot.read(path); - try { + /* + * Make sure commit is not occurring right now. Listener has to be registered and its + * state capture enqueued at a consistent point. + * + * FIXME: improve this to read-write lock, such that multiple listener registrations + * can occur simultaneously + */ + final DataChangeListenerRegistration reg; + synchronized (this) { + reg = listenerNode.registerDataChangeListener(path, listener, scope); + + Optional currentState = snapshot.get().read(path); if (currentState.isPresent()) { - NormalizedNode data = currentState.get().getData(); - listener.onDataChanged(DOMImmutableDataChangeEvent.builder() // + final NormalizedNode data = currentState.get().getData(); + + final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() // .setAfter(data) // .addCreated(path, data) // - .build() // - ); + .build(); + executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event)); } - } catch (Exception e) { - LOG.error("Unhandled exception encountered when invoking listener {}", listener, e); } + return reg; } private synchronized DOMStoreThreePhaseCommitCohort submit( @@ -137,23 +141,30 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch return name + "-" + txCounter.getAndIncrement(); } - private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot, + private void commit(final DataAndMetadataSnapshot currentSnapshot, final StoreMetadataNode newDataTree, final Iterable listenerTasks) { LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion()); if(LOG.isTraceEnabled()) { LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree)); } - checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs"); - snapshot = DataAndMetadataSnapshot.builder() // + + final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() // .setMetadataTree(newDataTree) // .setSchemaContext(schemaContext) // .build(); - for(ChangeListenerNotifyTask task : listenerTasks) { - executor.submit(task); - } + /* + * The commit has to occur atomically with regard to listener registrations. + */ + synchronized (this) { + final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot); + checkState(success, "Store snapshot and transaction snapshot differ. This should never happen."); + for (ChangeListenerNotifyTask task : listenerTasks) { + executor.submit(task); + } + } } private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction { @@ -294,7 +305,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture canCommit() { - final DataAndMetadataSnapshot snapshotCapture = snapshot; + final DataAndMetadataSnapshot snapshotCapture = snapshot.get(); final ModificationApplyOperation snapshotOperation = operationTree; return executor.submit(new Callable() { @@ -311,7 +322,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture preCommit() { - storeSnapshot = snapshot; + storeSnapshot = snapshot.get(); if(modification.getModificationType() == ModificationType.UNMODIFIED) { return Futures.immediateFuture(null); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java index e48438d6fa..2528d383b9 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java @@ -9,7 +9,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -20,7 +19,7 @@ import com.google.common.base.Optional; public class ListenerRegistrationNode implements StoreTreeNode, Identifiable { - private final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class); + private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class); private final ListenerRegistrationNode parent; private final Map children; @@ -49,6 +48,7 @@ public class ListenerRegistrationNode implements StoreTreeNode> getListeners() { + // FIXME: this is not thread-safe and races with listener (un)registration! return (Collection) listeners; } @@ -75,7 +75,7 @@ public class ListenerRegistrationNode implements StoreTreeNode>> ListenerRegistration registerDataChangeListener(final InstanceIdentifier path, + public >> DataChangeListenerRegistration registerDataChangeListener(final InstanceIdentifier path, final L listener, final DataChangeScope scope) { DataChangeListenerRegistration listenerReg = new DataChangeListenerRegistration(path,listener, scope, this); -- 2.36.6