import static com.google.common.base.Preconditions.checkState;
import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
private final ListeningExecutorService executor;
private final String name;
private final AtomicLong txCounter = new AtomicLong(0);
-
- private DataAndMetadataSnapshot snapshot;
- private ModificationApplyOperation operationTree;
private final ListenerRegistrationNode listenerTree;
+ private final AtomicReference<DataAndMetadataSnapshot> snapshot;
-
+ private ModificationApplyOperation operationTree;
private SchemaContext schemaContext;
public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
this.name = Preconditions.checkNotNull(name);
this.executor = Preconditions.checkNotNull(executor);
- this.operationTree = new AlwaysFailOperation();
- this.snapshot = DataAndMetadataSnapshot.createEmpty();
this.listenerTree = ListenerRegistrationNode.createRoot();
+ this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
+ this.operationTree = new AlwaysFailOperation();
}
@Override
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+ return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+ return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+ return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
}
@Override
final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
ListenerRegistrationNode listenerNode = listenerTree;
- for(PathArgument arg :path.getPath()) {
+ for(PathArgument arg : path.getPath()) {
listenerNode = listenerNode.ensureChild(arg);
}
- synchronized (listener) {
- notifyInitialState(path, listener);
- }
- return listenerNode.registerDataChangeListener(path,listener, scope);
- }
- private void notifyInitialState(final InstanceIdentifier path,
- final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
- Optional<StoreMetadataNode> currentState = snapshot.read(path);
- try {
+ /*
+ * Make sure commit is not occurring right now. Listener has to be registered and its
+ * state capture enqueued at a consistent point.
+ *
+ * FIXME: improve this to read-write lock, such that multiple listener registrations
+ * can occur simultaneously
+ */
+ final DataChangeListenerRegistration<L> reg;
+ synchronized (this) {
+ reg = listenerNode.registerDataChangeListener(path, listener, scope);
+
+ Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
if (currentState.isPresent()) {
- NormalizedNode<?, ?> data = currentState.get().getData();
- listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+ final NormalizedNode<?, ?> data = currentState.get().getData();
+
+ final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
.setAfter(data) //
.addCreated(path, data) //
- .build() //
- );
+ .build();
+ executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
}
- } catch (Exception e) {
- LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
}
+ return reg;
}
private synchronized DOMStoreThreePhaseCommitCohort submit(
return name + "-" + txCounter.getAndIncrement();
}
- private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+ private void commit(final DataAndMetadataSnapshot currentSnapshot,
final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
if(LOG.isTraceEnabled()) {
LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
}
- checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
- snapshot = DataAndMetadataSnapshot.builder() //
+
+ final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
.setMetadataTree(newDataTree) //
.setSchemaContext(schemaContext) //
.build();
- for(ChangeListenerNotifyTask task : listenerTasks) {
- executor.submit(task);
- }
+ /*
+ * The commit has to occur atomically with regard to listener registrations.
+ */
+ synchronized (this) {
+ final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+ checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
+ for (ChangeListenerNotifyTask task : listenerTasks) {
+ executor.submit(task);
+ }
+ }
}
private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
@Override
public ListenableFuture<Boolean> canCommit() {
- final DataAndMetadataSnapshot snapshotCapture = snapshot;
+ final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
final ModificationApplyOperation snapshotOperation = operationTree;
return executor.submit(new Callable<Boolean>() {
@Override
public ListenableFuture<Void> preCommit() {
- storeSnapshot = snapshot;
+ storeSnapshot = snapshot.get();
if(modification.getModificationType() == ModificationType.UNMODIFIED) {
return Futures.immediateFuture(null);
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
- private final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
private final ListenerRegistrationNode parent;
private final Map<PathArgument, ListenerRegistrationNode> children;
@SuppressWarnings({ "rawtypes", "unchecked" })
public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
+ // FIXME: this is not thread-safe and races with listener (un)registration!
return (Collection) listeners;
}
* @param scope Scope of triggering event.
* @return
*/
- public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
+ public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
final L listener, final DataChangeScope scope) {
DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);