From: Robert Varga Date: Tue, 8 Apr 2014 16:07:32 +0000 (+0200) Subject: BUG-509: Fix thread safety of listener registration X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~259 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a55afc7157bfc72ddd6d639c3842768763933a57 BUG-509: Fix thread safety of listener registration This commit fixes the race condition where a listener is registered after preCommit() and before commit(). This is done by moving the collection into the commit() where it is protected by the commit/listener synchronized block. The unregistration safety is handled by wrapping the returned registration and invoking the equivalent synchronized block. Change-Id: Ie9abc81b2a773418b34c3051f7665b0dcf047f76 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java index 0948f6d3e5..f231bb5c39 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java @@ -4,8 +4,9 @@ import static org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableData import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.append; import static org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils.getChild; -import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder; @@ -23,17 +24,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; public class DataChangeEventResolver { - - - private static final Logger LOG = LoggerFactory.getLogger(DataChangeEventResolver.class); - + private static final Logger LOG = LoggerFactory.getLogger(DataChangeEventResolver.class); private static final DOMImmutableDataChangeEvent NO_CHANGE = builder().build(); + private final ImmutableList.Builder tasks = ImmutableList.builder(); private InstanceIdentifier rootPath; private ListenerRegistrationNode listenerRoot; private NodeModification modificationRoot; private Optional beforeRoot; private Optional afterRoot; - private final ImmutableList.Builder tasks = ImmutableList.builder(); protected InstanceIdentifier getRootPath() { return rootPath; @@ -133,11 +131,7 @@ public class DataChangeEventResolver { builder.merge(resolveCreateEvent(childPath, childListeners, child)); } - DOMImmutableDataChangeEvent event = builder.build(); - if (listeners.isPresent()) { - addNotifyTask(listeners.get().getListeners(), event); - } - return event; + return addNotifyTask(listeners, builder.build()); } private DOMImmutableDataChangeEvent resolveDeleteEvent(final InstanceIdentifier path, @@ -151,12 +145,7 @@ public class DataChangeEventResolver { InstanceIdentifier childPath = StoreUtils.append(path, childId); builder.merge(resolveDeleteEvent(childPath, childListeners, child)); } - DOMImmutableDataChangeEvent event = builder.build(); - if (listeners.isPresent()) { - addNotifyTask(listeners.get().getListeners(), event); - } - return event; - + return addNotifyTask(listeners, builder.build()); } private DOMImmutableDataChangeEvent resolveSubtreeChangeEvent(final InstanceIdentifier path, @@ -206,33 +195,35 @@ public class DataChangeEventResolver { return builder().build(); } - private void addNotifyTask(final ListenerRegistrationNode listenerRegistrationNode, final DataChangeScope scope, - final DOMImmutableDataChangeEvent event) { - Collection> potential = listenerRegistrationNode.getListeners(); - if(potential.isEmpty()) { - return; - } - ArrayList> toNotify = new ArrayList<>(potential.size()); - for(DataChangeListenerRegistration listener : potential) { - if(scope.equals(listener.getScope())) { - toNotify.add(listener); + private DOMImmutableDataChangeEvent addNotifyTask(final Optional listeners, final DOMImmutableDataChangeEvent event) { + if (listeners.isPresent()) { + final Collection> l = listeners.get().getListeners(); + if (!l.isEmpty()) { + tasks.add(new ChangeListenerNotifyTask(ImmutableSet.copyOf(l), event)); } } - addNotifyTask(toNotify, event); + return event; } - private void addNotifyTask(final Collection> listeners, + private void addNotifyTask(final ListenerRegistrationNode listenerRegistrationNode, final DataChangeScope scope, final DOMImmutableDataChangeEvent event) { - if(!listeners.isEmpty()) { - tasks.add(new ChangeListenerNotifyTask(ImmutableSet.copyOf(listeners),event)); + Collection> potential = listenerRegistrationNode.getListeners(); + if(!potential.isEmpty()) { + final Set> toNotify = new HashSet<>(potential.size()); + for(DataChangeListenerRegistration listener : potential) { + if(scope.equals(listener.getScope())) { + toNotify.add(listener); + } + } + + if (!toNotify.isEmpty()) { + tasks.add(new ChangeListenerNotifyTask(toNotify, event)); + } } } public static DataChangeEventResolver create() { return new DataChangeEventResolver(); } - - - } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 788bc68f03..de0c146392 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -29,6 +29,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; @@ -102,11 +103,6 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public >> ListenerRegistration registerChangeListener( final InstanceIdentifier path, final L listener, final DataChangeScope scope) { - LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); - ListenerRegistrationNode listenerNode = listenerTree; - for(PathArgument arg : path.getPath()) { - listenerNode = listenerNode.ensureChild(arg); - } /* * Make sure commit is not occurring right now. Listener has to be registered and its @@ -117,6 +113,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch */ final DataChangeListenerRegistration reg; synchronized (this) { + LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); + ListenerRegistrationNode listenerNode = listenerTree; + for(PathArgument arg : path.getPath()) { + listenerNode = listenerNode.ensureChild(arg); + } + reg = listenerNode.registerDataChangeListener(path, listener, scope); Optional currentState = snapshot.get().read(path); @@ -131,7 +133,14 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch } } - return reg; + return new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + synchronized (InMemoryDOMDataStore.this) { + reg.close(); + } + } + }; } private synchronized DOMStoreThreePhaseCommitCohort submit( @@ -145,7 +154,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch } private void commit(final DataAndMetadataSnapshot currentSnapshot, - final StoreMetadataNode newDataTree, final Iterable listenerTasks) { + final StoreMetadataNode newDataTree, final DataChangeEventResolver listenerResolver) { LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion()); if(LOG.isTraceEnabled()) { @@ -164,7 +173,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot); checkState(success, "Store snapshot and transaction snapshot differ. This should never happen."); - for (ChangeListenerNotifyTask task : listenerTasks) { + for (ChangeListenerNotifyTask task : listenerResolver.resolve()) { executor.submit(task); } } @@ -302,7 +311,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch private DataAndMetadataSnapshot storeSnapshot; private Optional proposedSubtree; - private Iterable listenerTasks; + private DataChangeEventResolver listenerResolver; public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) { this.transaction = writeTransaction; @@ -343,13 +352,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree), increase(metadataTree.getSubtreeVersion())); - listenerTasks = DataChangeEventResolver.create() // + listenerResolver = DataChangeEventResolver.create() // .setRootPath(PUBLIC_ROOT_PATH) // .setBeforeRoot(Optional.of(metadataTree)) // .setAfterRoot(proposedSubtree) // .setModificationRoot(modification) // - .setListenerRoot(listenerTree) // - .resolve(); + .setListenerRoot(listenerTree); return null; } @@ -372,7 +380,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch checkState(proposedSubtree != null,"Proposed subtree must be computed"); checkState(storeSnapshot != null,"Proposed subtree must be computed"); // return ImmediateFuture<>; - InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks); + InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerResolver); return Futures. immediateFuture(null); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java index 2528d383b9..854c125af1 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java @@ -46,9 +46,16 @@ public class ListenerRegistrationNode implements StoreTreeNode> getListeners() { - // FIXME: this is not thread-safe and races with listener (un)registration! return (Collection) listeners; } @@ -67,7 +74,6 @@ public class ListenerRegistrationNode implements StoreTreeNode>> DataChangeListenerRegistration registerDataChangeListener(final InstanceIdentifier path, + public synchronized >> DataChangeListenerRegistration registerDataChangeListener(final InstanceIdentifier path, final L listener, final DataChangeScope scope) { DataChangeListenerRegistration listenerReg = new DataChangeListenerRegistration(path,listener, scope, this); listeners.add(listenerReg); + LOG.debug("Listener {} registered", listener); return listenerReg; } - private void removeListener(final DataChangeListenerRegistration listener) { + private synchronized void removeListener(final DataChangeListenerRegistration listener) { listeners.remove(listener); + LOG.debug("Listener {} unregistered", listener); removeThisIfUnused(); }