X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fstore%2Fimpl%2FInMemoryDOMDataStore.java;h=0cd00cad7e6ec9609a03bac044f1ac613edf32c2;hp=4d5b8ee906a07640f5be07bfb8a3173b57a66bb9;hb=08217531fbe76dbcc429c71d593894fc211e50aa;hpb=bcd020ecbeeacc97df1717a238d93bacd87bcbfc diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 4d5b8ee906..0cd00cad7e 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -16,6 +16,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType; import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification; import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -43,22 +45,27 @@ import com.google.common.util.concurrent.ListeningExecutorService; public class InMemoryDOMDataStore implements DOMStore, Identifiable, SchemaContextListener { private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class); + private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build(); + + private final ListeningExecutorService executor; + private final String name; private final AtomicLong txCounter = new AtomicLong(0); private DataAndMetadataSnapshot snapshot; - private ModificationApplyOperation operation; + private ModificationApplyOperation operationTree; + private final ListenerRegistrationNode listenerTree; + - private final ListeningExecutorService executor; - private final String name; private SchemaContext schemaContext; public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) { this.executor = executor; this.name = name; - this.operation = new AllwaysFailOperation(); + this.operationTree = new AllwaysFailOperation(); this.snapshot = DataAndMetadataSnapshot.createEmpty(); + this.listenerTree = ListenerRegistrationNode.createRoot(); } @Override @@ -73,35 +80,81 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operation); + return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operation); + return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree); } @Override public synchronized void onGlobalContextUpdated(final SchemaContext ctx) { - operation = SchemaAwareApplyOperationRoot.from(ctx); + operationTree = SchemaAwareApplyOperationRoot.from(ctx); schemaContext = ctx; } @Override public >> ListenerRegistration registerChangeListener( final InstanceIdentifier path, final L listener, final DataChangeScope scope) { - return null; + LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); + ListenerRegistrationNode listenerNode = listenerTree; + for(PathArgument arg :path.getPath()) { + listenerNode = listenerNode.ensureChild(arg); + } + synchronized (listener) { + notifyInitialState(path, listener); + } + return listenerNode.registerDataChangeListener(path,listener, scope); + } + + private void notifyInitialState(final InstanceIdentifier path, + final AsyncDataChangeListener> listener) { + Optional currentState = snapshot.read(path); + try { + if (currentState.isPresent()) { + NormalizedNode data = currentState.get().getData(); + listener.onDataChanged(DOMImmutableDataChangeEvent.builder() // + .setAfter(data) // + .addCreated(path, data) // + .build() // + ); + } + } catch (Exception e) { + LOG.error("Unhandled exception encountered when invoking listener {}", listener, e); + } + } private synchronized DOMStoreThreePhaseCommitCohort submit( - final SnaphostBackedWriteTransaction snaphostBackedWriteTransaction) { - return new ThreePhaseCommitImpl(snaphostBackedWriteTransaction); + final SnaphostBackedWriteTransaction writeTx) { + LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView()); + return new ThreePhaseCommitImpl(writeTx); } private Object nextIdentifier() { return name + "-" + txCounter.getAndIncrement(); } + private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot, + final StoreMetadataNode newDataTree, final Iterable listenerTasks) { + LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion()); + + if(LOG.isTraceEnabled()) { + LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree)); + } + checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs"); + snapshot = DataAndMetadataSnapshot.builder() // + .setMetadataTree(newDataTree) // + .setSchemaContext(schemaContext) // + .build(); + + for(ChangeListenerNotifyTask task : listenerTasks) { + executor.submit(task); + } + + } + private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction { private DataAndMetadataSnapshot stableSnapshot; @@ -110,6 +163,8 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) { this.identifier = identifier; this.stableSnapshot = snapshot; + LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); + } @Override @@ -149,6 +204,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch this.identifier = identifier; mutableTree = MutableDataTree.from(snapshot, applyOper); this.store = store; + LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); } @Override @@ -185,7 +241,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public synchronized DOMStoreThreePhaseCommitCohort ready() { ready = true; - LOG.debug("Store transaction: {} : Ready",getIdentifier()); + LOG.debug("Store transaction: {} : Ready", getIdentifier()); mutableTree.seal(); return store.submit(this); } @@ -228,6 +284,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch private DataAndMetadataSnapshot storeSnapshot; private Optional proposedSubtree; + private Iterable listenerTasks; public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) { this.transaction = writeTransaction; @@ -237,14 +294,15 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture canCommit() { final DataAndMetadataSnapshot snapshotCapture = snapshot; - final ModificationApplyOperation snapshotOperation = operation; + final ModificationApplyOperation snapshotOperation = operationTree; return executor.submit(new Callable() { @Override public Boolean call() throws Exception { - boolean applicable = snapshotOperation.isApplicable(modification, Optional.of(snapshotCapture.getMetadataTree())); - LOG.debug("Store Transcation: {} : canCommit : {}",transaction.getIdentifier(),applicable); + boolean applicable = snapshotOperation.isApplicable(modification, + Optional.of(snapshotCapture.getMetadataTree())); + LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable); return applicable; } }); @@ -253,12 +311,28 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture preCommit() { storeSnapshot = snapshot; + if(modification.getModificationType() == ModificationType.UNMODIFIED) { + return Futures.immediateFuture(null); + } return executor.submit(new Callable() { + + @Override public Void call() throws Exception { StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree(); - proposedSubtree = operation.apply(modification, Optional.of(metadataTree),increase(metadataTree.getSubtreeVersion())); + + proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree), + increase(metadataTree.getSubtreeVersion())); + + listenerTasks = DataChangeEventResolver.create() // + .setRootPath(PUBLIC_ROOT_PATH) // + .setBeforeRoot(Optional.of(metadataTree)) // + .setAfterRoot(proposedSubtree) // + .setModificationRoot(modification) // + .setListenerRoot(listenerTree) // + .resolve(); + return null; } }); @@ -273,28 +347,24 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture commit() { - checkState(proposedSubtree != null); - checkState(storeSnapshot != null); + if(modification.getModificationType() == ModificationType.UNMODIFIED) { + return Futures.immediateFuture(null); + } + + checkState(proposedSubtree != null,"Proposed subtree must be computed"); + checkState(storeSnapshot != null,"Proposed subtree must be computed"); // return ImmediateFuture<>; - InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree); + InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks); return Futures. immediateFuture(null); } } - private synchronized void commit(final DataAndMetadataSnapshot storeSnapshot, - final Optional proposedSubtree) { - //LOG.info("Updating Store snaphot."); - checkState(snapshot == storeSnapshot, "Store snapshot and transaction snapshot differs"); - snapshot = DataAndMetadataSnapshot.builder().setMetadataTree(proposedSubtree.get()) - .setSchemaContext(schemaContext).build(); - } - private class AllwaysFailOperation implements ModificationApplyOperation { @Override public Optional apply(final NodeModification modification, - final Optional storeMeta,final UnsignedLong subtreeVersion) { + final Optional storeMeta, final UnsignedLong subtreeVersion) { throw new IllegalStateException("Schema Context is not available."); }