X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fstore%2Fimpl%2FInMemoryDOMDataStore.java;h=2091913f24949106aae128e07083a4354b992b70;hb=0692e55fe3536ab24a3f6c20e6f96dcd04315d9a;hp=39299ab1bdaefa1cea0626c60b7c7a697c46d9fd;hpb=af4995552e842e052a085ed0845bfda97f5fe668;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 39299ab1bd..2091913f24 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -11,15 +11,18 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType; import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification; import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; @@ -37,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -51,41 +55,39 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch private final ListeningExecutorService executor; private final String name; private final AtomicLong txCounter = new AtomicLong(0); - - private DataAndMetadataSnapshot snapshot; - private ModificationApplyOperation operationTree; private final ListenerRegistrationNode listenerTree; + private final AtomicReference snapshot; - + private ModificationApplyOperation operationTree; private SchemaContext schemaContext; public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) { - this.executor = executor; - this.name = name; - this.operationTree = new AllwaysFailOperation(); - this.snapshot = DataAndMetadataSnapshot.createEmpty(); + this.name = Preconditions.checkNotNull(name); + this.executor = Preconditions.checkNotNull(executor); this.listenerTree = ListenerRegistrationNode.createRoot(); + this.snapshot = new AtomicReference(DataAndMetadataSnapshot.createEmpty()); + this.operationTree = new AlwaysFailOperation(); } @Override - public String getIdentifier() { + public final String getIdentifier() { return name; } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot); + return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get()); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree); } @Override @@ -97,55 +99,72 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public >> ListenerRegistration registerChangeListener( final InstanceIdentifier path, final L listener, final DataChangeScope scope) { - - Optional listenerNode = TreeNodeUtils.findNode(listenerTree, path); - checkState(listenerNode.isPresent()); - synchronized (listener) { - notifyInitialState(path, listener); + LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); + ListenerRegistrationNode listenerNode = listenerTree; + for(PathArgument arg : path.getPath()) { + listenerNode = listenerNode.ensureChild(arg); } - return listenerNode.get().registerDataChangeListener(listener, scope); - } - private void notifyInitialState(final InstanceIdentifier path, - final AsyncDataChangeListener> listener) { - Optional currentState = snapshot.read(path); - try { + /* + * Make sure commit is not occurring right now. Listener has to be registered and its + * state capture enqueued at a consistent point. + * + * FIXME: improve this to read-write lock, such that multiple listener registrations + * can occur simultaneously + */ + final DataChangeListenerRegistration reg; + synchronized (this) { + reg = listenerNode.registerDataChangeListener(path, listener, scope); + + Optional currentState = snapshot.get().read(path); if (currentState.isPresent()) { - NormalizedNode data = currentState.get().getData(); - listener.onDataChanged(DOMImmutableDataChangeEvent.builder() // + final NormalizedNode data = currentState.get().getData(); + + final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() // .setAfter(data) // .addCreated(path, data) // - .build() // - ); + .build(); + executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event)); } - } catch (Exception e) { - LOG.error("Unhandled exception encountered when invoking listener {}", listener, e); } + return reg; } private synchronized DOMStoreThreePhaseCommitCohort submit( - final SnaphostBackedWriteTransaction snaphostBackedWriteTransaction) { - return new ThreePhaseCommitImpl(snaphostBackedWriteTransaction); + final SnaphostBackedWriteTransaction writeTx) { + LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView()); + return new ThreePhaseCommitImpl(writeTx); } private Object nextIdentifier() { return name + "-" + txCounter.getAndIncrement(); } - private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot, + private void commit(final DataAndMetadataSnapshot currentSnapshot, final StoreMetadataNode newDataTree, final Iterable listenerTasks) { LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion()); - checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs"); - snapshot = DataAndMetadataSnapshot.builder() // + + if(LOG.isTraceEnabled()) { + LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree)); + } + + final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() // .setMetadataTree(newDataTree) // .setSchemaContext(schemaContext) // .build(); - for(ChangeListenerNotifyTask task : listenerTasks) { - executor.submit(task); - } + /* + * The commit has to occur atomically with regard to listener registrations. + */ + synchronized (this) { + final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot); + checkState(success, "Store snapshot and transaction snapshot differ. This should never happen."); + for (ChangeListenerNotifyTask task : listenerTasks) { + executor.submit(task); + } + } } private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction { @@ -156,6 +175,8 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) { this.identifier = identifier; this.stableSnapshot = snapshot; + LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); + } @Override @@ -195,6 +216,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch this.identifier = identifier; mutableTree = MutableDataTree.from(snapshot, applyOper); this.store = store; + LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); } @Override @@ -283,7 +305,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture canCommit() { - final DataAndMetadataSnapshot snapshotCapture = snapshot; + final DataAndMetadataSnapshot snapshotCapture = snapshot.get(); final ModificationApplyOperation snapshotOperation = operationTree; return executor.submit(new Callable() { @@ -300,7 +322,10 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture preCommit() { - storeSnapshot = snapshot; + storeSnapshot = snapshot.get(); + if(modification.getModificationType() == ModificationType.UNMODIFIED) { + return Futures.immediateFuture(null); + } return executor.submit(new Callable() { @@ -312,7 +337,6 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree), increase(metadataTree.getSubtreeVersion())); - listenerTasks = DataChangeEventResolver.create() // .setRootPath(PUBLIC_ROOT_PATH) // .setBeforeRoot(Optional.of(metadataTree)) // @@ -335,8 +359,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture commit() { - checkState(proposedSubtree != null); - checkState(storeSnapshot != null); + if(modification.getModificationType() == ModificationType.UNMODIFIED) { + return Futures.immediateFuture(null); + } + + checkState(proposedSubtree != null,"Proposed subtree must be computed"); + checkState(storeSnapshot != null,"Proposed subtree must be computed"); // return ImmediateFuture<>; InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks); return Futures. immediateFuture(null); @@ -344,7 +372,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch } - private class AllwaysFailOperation implements ModificationApplyOperation { + private static final class AlwaysFailOperation implements ModificationApplyOperation { @Override public Optional apply(final NodeModification modification,