summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
7e7bdae)
This wraps the datastore root in an AtomicReference, which makes sure
started transactions see the latest published commit. This will make
the datastore handling more robust under tight conditions.
Also uncovers the fact that we are invoking user code under lock, which
we fix by reusing the executor used by the commit machinery.
Finally it uncovers thread-unsafe listener list manipuation. This will
need to be addressed in a follow-up patch.
Change-Id: Ic7efd266ef680701c1f0944ee675122d8527568b
Signed-off-by: Robert Varga <rovarga@cisco.com>
class ChangeListenerNotifyTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
class ChangeListenerNotifyTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
- private final Iterable<DataChangeListenerRegistration<?>> listeners;
+ private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event;
private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event;
- public ChangeListenerNotifyTask(final Iterable<DataChangeListenerRegistration<?>> listeners,
+ public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event) {
this.listeners = listeners;
this.event = event;
final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event) {
this.listeners = listeners;
this.event = event;
import static com.google.common.base.Preconditions.checkState;
import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
import static com.google.common.base.Preconditions.checkState;
import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
private final ListeningExecutorService executor;
private final String name;
private final AtomicLong txCounter = new AtomicLong(0);
private final ListeningExecutorService executor;
private final String name;
private final AtomicLong txCounter = new AtomicLong(0);
-
- private DataAndMetadataSnapshot snapshot;
- private ModificationApplyOperation operationTree;
private final ListenerRegistrationNode listenerTree;
private final ListenerRegistrationNode listenerTree;
+ private final AtomicReference<DataAndMetadataSnapshot> snapshot;
+ private ModificationApplyOperation operationTree;
private SchemaContext schemaContext;
public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
this.name = Preconditions.checkNotNull(name);
this.executor = Preconditions.checkNotNull(executor);
private SchemaContext schemaContext;
public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
this.name = Preconditions.checkNotNull(name);
this.executor = Preconditions.checkNotNull(executor);
- this.operationTree = new AlwaysFailOperation();
- this.snapshot = DataAndMetadataSnapshot.createEmpty();
this.listenerTree = ListenerRegistrationNode.createRoot();
this.listenerTree = ListenerRegistrationNode.createRoot();
+ this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
+ this.operationTree = new AlwaysFailOperation();
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+ return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+ return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+ return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
ListenerRegistrationNode listenerNode = listenerTree;
final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
ListenerRegistrationNode listenerNode = listenerTree;
- for(PathArgument arg :path.getPath()) {
+ for(PathArgument arg : path.getPath()) {
listenerNode = listenerNode.ensureChild(arg);
}
listenerNode = listenerNode.ensureChild(arg);
}
- synchronized (listener) {
- notifyInitialState(path, listener);
- }
- return listenerNode.registerDataChangeListener(path,listener, scope);
- }
- private void notifyInitialState(final InstanceIdentifier path,
- final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
- Optional<StoreMetadataNode> currentState = snapshot.read(path);
- try {
+ /*
+ * Make sure commit is not occurring right now. Listener has to be registered and its
+ * state capture enqueued at a consistent point.
+ *
+ * FIXME: improve this to read-write lock, such that multiple listener registrations
+ * can occur simultaneously
+ */
+ final DataChangeListenerRegistration<L> reg;
+ synchronized (this) {
+ reg = listenerNode.registerDataChangeListener(path, listener, scope);
+
+ Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
if (currentState.isPresent()) {
if (currentState.isPresent()) {
- NormalizedNode<?, ?> data = currentState.get().getData();
- listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+ final NormalizedNode<?, ?> data = currentState.get().getData();
+
+ final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
.setAfter(data) //
.addCreated(path, data) //
.setAfter(data) //
.addCreated(path, data) //
+ .build();
+ executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
- } catch (Exception e) {
- LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
}
private synchronized DOMStoreThreePhaseCommitCohort submit(
}
private synchronized DOMStoreThreePhaseCommitCohort submit(
return name + "-" + txCounter.getAndIncrement();
}
return name + "-" + txCounter.getAndIncrement();
}
- private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+ private void commit(final DataAndMetadataSnapshot currentSnapshot,
final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
if(LOG.isTraceEnabled()) {
LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
}
final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
if(LOG.isTraceEnabled()) {
LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
}
- checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
- snapshot = DataAndMetadataSnapshot.builder() //
+
+ final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
.setMetadataTree(newDataTree) //
.setSchemaContext(schemaContext) //
.build();
.setMetadataTree(newDataTree) //
.setSchemaContext(schemaContext) //
.build();
- for(ChangeListenerNotifyTask task : listenerTasks) {
- executor.submit(task);
- }
+ /*
+ * The commit has to occur atomically with regard to listener registrations.
+ */
+ synchronized (this) {
+ final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+ checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
+ for (ChangeListenerNotifyTask task : listenerTasks) {
+ executor.submit(task);
+ }
+ }
}
private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
}
private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
@Override
public ListenableFuture<Boolean> canCommit() {
@Override
public ListenableFuture<Boolean> canCommit() {
- final DataAndMetadataSnapshot snapshotCapture = snapshot;
+ final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
final ModificationApplyOperation snapshotOperation = operationTree;
return executor.submit(new Callable<Boolean>() {
final ModificationApplyOperation snapshotOperation = operationTree;
return executor.submit(new Callable<Boolean>() {
@Override
public ListenableFuture<Void> preCommit() {
@Override
public ListenableFuture<Void> preCommit() {
- storeSnapshot = snapshot;
+ storeSnapshot = snapshot.get();
if(modification.getModificationType() == ModificationType.UNMODIFIED) {
return Futures.immediateFuture(null);
}
if(modification.getModificationType() == ModificationType.UNMODIFIED) {
return Futures.immediateFuture(null);
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
- private final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
private final ListenerRegistrationNode parent;
private final Map<PathArgument, ListenerRegistrationNode> children;
private final ListenerRegistrationNode parent;
private final Map<PathArgument, ListenerRegistrationNode> children;
@SuppressWarnings({ "rawtypes", "unchecked" })
public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
@SuppressWarnings({ "rawtypes", "unchecked" })
public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
+ // FIXME: this is not thread-safe and races with listener (un)registration!
return (Collection) listeners;
}
return (Collection) listeners;
}
* @param scope Scope of triggering event.
* @return
*/
* @param scope Scope of triggering event.
* @return
*/
- public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
+ public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
final L listener, final DataChangeScope scope) {
DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);
final L listener, final DataChangeScope scope) {
DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);