import static com.google.common.base.Preconditions.checkState;
import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
private final ListeningExecutorService executor;
private final String name;
private final AtomicLong txCounter = new AtomicLong(0);
-
- private DataAndMetadataSnapshot snapshot;
- private ModificationApplyOperation operationTree;
private final ListenerRegistrationNode listenerTree;
+ private final AtomicReference<DataAndMetadataSnapshot> snapshot;
-
+ private ModificationApplyOperation operationTree;
private SchemaContext schemaContext;
public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
- this.executor = executor;
- this.name = name;
- this.operationTree = new AllwaysFailOperation();
- this.snapshot = DataAndMetadataSnapshot.createEmpty();
+ this.name = Preconditions.checkNotNull(name);
+ this.executor = Preconditions.checkNotNull(executor);
this.listenerTree = ListenerRegistrationNode.createRoot();
+ this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
+ this.operationTree = new AlwaysFailOperation();
}
@Override
- public String getIdentifier() {
+ public final String getIdentifier() {
return name;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+ return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+ return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
+ return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
}
@Override
@Override
public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
-
- Optional<ListenerRegistrationNode> listenerNode = TreeNodeUtils.findNode(listenerTree, path);
- checkState(listenerNode.isPresent());
- synchronized (listener) {
- notifyInitialState(path, listener);
+ LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
+ ListenerRegistrationNode listenerNode = listenerTree;
+ for(PathArgument arg : path.getPath()) {
+ listenerNode = listenerNode.ensureChild(arg);
}
- return listenerNode.get().registerDataChangeListener(listener, scope);
- }
- private void notifyInitialState(final InstanceIdentifier path,
- final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
- Optional<StoreMetadataNode> currentState = snapshot.read(path);
- try {
+ /*
+ * Make sure commit is not occurring right now. Listener has to be registered and its
+ * state capture enqueued at a consistent point.
+ *
+ * FIXME: improve this to read-write lock, such that multiple listener registrations
+ * can occur simultaneously
+ */
+ final DataChangeListenerRegistration<L> reg;
+ synchronized (this) {
+ reg = listenerNode.registerDataChangeListener(path, listener, scope);
+
+ Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
if (currentState.isPresent()) {
- NormalizedNode<?, ?> data = currentState.get().getData();
- listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+ final NormalizedNode<?, ?> data = currentState.get().getData();
+
+ final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
.setAfter(data) //
.addCreated(path, data) //
- .build() //
- );
+ .build();
+ executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
}
- } catch (Exception e) {
- LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
}
+ return reg;
}
private synchronized DOMStoreThreePhaseCommitCohort submit(
return name + "-" + txCounter.getAndIncrement();
}
- private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+ private void commit(final DataAndMetadataSnapshot currentSnapshot,
final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
+
if(LOG.isTraceEnabled()) {
LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
}
- checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
- snapshot = DataAndMetadataSnapshot.builder() //
+
+ final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
.setMetadataTree(newDataTree) //
.setSchemaContext(schemaContext) //
.build();
- for(ChangeListenerNotifyTask task : listenerTasks) {
- executor.submit(task);
- }
+ /*
+ * The commit has to occur atomically with regard to listener registrations.
+ */
+ synchronized (this) {
+ final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+ checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
+ for (ChangeListenerNotifyTask task : listenerTasks) {
+ executor.submit(task);
+ }
+ }
}
private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
@Override
public ListenableFuture<Boolean> canCommit() {
- final DataAndMetadataSnapshot snapshotCapture = snapshot;
+ final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
final ModificationApplyOperation snapshotOperation = operationTree;
return executor.submit(new Callable<Boolean>() {
@Override
public ListenableFuture<Void> preCommit() {
- storeSnapshot = snapshot;
+ storeSnapshot = snapshot.get();
if(modification.getModificationType() == ModificationType.UNMODIFIED) {
return Futures.immediateFuture(null);
}
proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
increase(metadataTree.getSubtreeVersion()));
-
listenerTasks = DataChangeEventResolver.create() //
.setRootPath(PUBLIC_ROOT_PATH) //
.setBeforeRoot(Optional.of(metadataTree)) //
}
- private class AllwaysFailOperation implements ModificationApplyOperation {
+ private static final class AlwaysFailOperation implements ModificationApplyOperation {
@Override
public Optional<StoreMetadataNode> apply(final NodeModification modification,