X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fstore%2Fimpl%2FInMemoryDOMDataStore.java;h=a854c4806b4a61f8f3d52716e23f978503465a16;hb=4f8e371e1b7f6a2aa31115407c3f37738030f4c5;hp=30c6ac4e7fc94bbc45b67519075246967671f182;hpb=f9a5557ea91bc6c877453583e17adf1606b58290;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 30c6ac4e7f..10b838a2c6 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -7,30 +7,35 @@ */ package org.opendaylight.controller.md.sal.dom.store.impl; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; + import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode; +import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTree; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeCandidate; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeModification; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeSnapshot; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.InMemoryDataTreeFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; @@ -38,35 +43,32 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -public class InMemoryDOMDataStore implements DOMStore, Identifiable, SchemaContextListener { - +/** + * In-memory DOM Data Store + * + * Implementation of {@link DOMStore} which uses {@link DataTree} and other + * classes such as {@link SnapshotBackedWriteTransaction}. + * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask} + * to implement {@link DOMStore} contract. + * + */ +public class InMemoryDOMDataStore implements DOMStore, Identifiable, SchemaContextListener, + TransactionReadyPrototype { private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class); - private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build(); - - + private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(); + private final ListenerTree listenerTree = ListenerTree.create(); + private final AtomicLong txCounter = new AtomicLong(0); private final ListeningExecutorService executor; private final String name; - private final AtomicLong txCounter = new AtomicLong(0); - - private DataAndMetadataSnapshot snapshot; - private ModificationApplyOperation operationTree; - private final ListenerRegistrationNode listenerTree; - - - - private SchemaContext schemaContext; public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) { this.name = Preconditions.checkNotNull(name); this.executor = Preconditions.checkNotNull(executor); - this.operationTree = new AlwaysFailOperation(); - this.snapshot = DataAndMetadataSnapshot.createEmpty(); - this.listenerTree = ListenerRegistrationNode.createRoot(); } @Override @@ -76,60 +78,71 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot); + return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot()); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this); + } + + @Override + public DOMStoreTransactionChain createTransactionChain() { + return new DOMStoreTransactionChainImpl(); } @Override public synchronized void onGlobalContextUpdated(final SchemaContext ctx) { - operationTree = SchemaAwareApplyOperationRoot.from(ctx); - schemaContext = ctx; + dataTree.setSchemaContext(ctx); } @Override public >> ListenerRegistration registerChangeListener( final InstanceIdentifier path, final L listener, final DataChangeScope scope) { - LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); - ListenerRegistrationNode listenerNode = listenerTree; - for(PathArgument arg :path.getPath()) { - listenerNode = listenerNode.ensureChild(arg); - } - synchronized (listener) { - notifyInitialState(path, listener); - } - return listenerNode.registerDataChangeListener(path,listener, scope); - } - private void notifyInitialState(final InstanceIdentifier path, - final AsyncDataChangeListener> listener) { - Optional currentState = snapshot.read(path); - try { + /* + * Make sure commit is not occurring right now. Listener has to be + * registered and its state capture enqueued at a consistent point. + * + * FIXME: improve this to read-write lock, such that multiple listener + * registrations can occur simultaneously + */ + final DataChangeListenerRegistration reg; + synchronized (this) { + LOG.debug("{}: Registering data change listener {} for {}", name, listener, path); + + reg = listenerTree.registerDataChangeListener(path, listener, scope); + + Optional> currentState = dataTree.takeSnapshot().readNode(path); if (currentState.isPresent()) { - NormalizedNode data = currentState.get().getData(); - listener.onDataChanged(DOMImmutableDataChangeEvent.builder() // + final NormalizedNode data = currentState.get(); + + final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) // .setAfter(data) // .addCreated(path, data) // - .build() // - ); + .build(); + executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event)); } - } catch (Exception e) { - LOG.error("Unhandled exception encountered when invoking listener {}", listener, e); } + return new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + synchronized (InMemoryDOMDataStore.this) { + reg.close(); + } + } + }; } - private synchronized DOMStoreThreePhaseCommitCohort submit( - final SnaphostBackedWriteTransaction writeTx) { - LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView()); + @Override + public synchronized DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) { + LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView()); return new ThreePhaseCommitImpl(writeTx); } @@ -137,203 +150,179 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch return name + "-" + txCounter.getAndIncrement(); } - private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot, - final StoreMetadataNode newDataTree, final Iterable listenerTasks) { - LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion()); - - if(LOG.isTraceEnabled()) { - LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree)); - } - checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs"); - snapshot = DataAndMetadataSnapshot.builder() // - .setMetadataTree(newDataTree) // - .setSchemaContext(schemaContext) // - .build(); - - for(ChangeListenerNotifyTask task : listenerTasks) { - executor.submit(task); - } - - } - - private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction { + private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype { - private DataAndMetadataSnapshot stableSnapshot; - private final Object identifier; + @GuardedBy("this") + private SnapshotBackedWriteTransaction latestOutstandingTx; - public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) { - this.identifier = identifier; - this.stableSnapshot = snapshot; - LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); + private boolean chainFailed = false; + private void checkFailed() { + Preconditions.checkState(!chainFailed, "Transaction chain is failed."); } @Override - public Object getIdentifier() { - return identifier; + public synchronized DOMStoreReadTransaction newReadOnlyTransaction() { + final DataTreeSnapshot snapshot; + checkFailed(); + if (latestOutstandingTx != null) { + checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready."); + snapshot = latestOutstandingTx.getMutatedView(); + } else { + snapshot = dataTree.takeSnapshot(); + } + return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot); } @Override - public void close() { - stableSnapshot = null; + public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() { + final DataTreeSnapshot snapshot; + checkFailed(); + if (latestOutstandingTx != null) { + checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready."); + snapshot = latestOutstandingTx.getMutatedView(); + } else { + snapshot = dataTree.takeSnapshot(); + } + final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(), + snapshot, this); + latestOutstandingTx = ret; + return ret; } @Override - public ListenableFuture>> read(final InstanceIdentifier path) { - checkNotNull(path, "Path must not be null."); - checkState(stableSnapshot != null, "Transaction is closed"); - return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path)); + public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() { + final DataTreeSnapshot snapshot; + checkFailed(); + if (latestOutstandingTx != null) { + checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready."); + snapshot = latestOutstandingTx.getMutatedView(); + } else { + snapshot = dataTree.takeSnapshot(); + } + final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot, + this); + latestOutstandingTx = ret; + return ret; } @Override - public String toString() { - return "SnapshotBackedReadTransaction [id =" + identifier + "]"; + public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) { + DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx); + return new ChainedTransactionCommitImpl(tx, storeCohort, this); } - } - - private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction { - - private MutableDataTree mutableTree; - private final Object identifier; - private InMemoryDOMDataStore store; - - private boolean ready = false; + @Override + public void close() { - public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot, - final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) { - this.identifier = identifier; - mutableTree = MutableDataTree.from(snapshot, applyOper); - this.store = store; - LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); } - @Override - public Object getIdentifier() { - return identifier; - } + protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, + final Throwable t) { + chainFailed = true; - @Override - public void close() { - this.mutableTree = null; - this.store = null; } - @Override - public void write(final InstanceIdentifier path, final NormalizedNode data) { - checkNotReady(); - mutableTree.write(path, data); + public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) { + // If commited transaction is latestOutstandingTx we clear + // latestOutstandingTx + // field in order to base new transactions on Datastore Data Tree + // directly. + if (transaction.equals(latestOutstandingTx)) { + latestOutstandingTx = null; + } } - @Override - public void delete(final InstanceIdentifier path) { - checkNotReady(); - mutableTree.delete(path); - } + } - protected boolean isReady() { - return ready; - } + private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort { - protected void checkNotReady() { - checkState(!ready, "Transaction is ready. No further modifications allowed."); - } + private final SnapshotBackedWriteTransaction transaction; + private final DOMStoreThreePhaseCommitCohort delegate; - @Override - public synchronized DOMStoreThreePhaseCommitCohort ready() { - ready = true; - LOG.debug("Store transaction: {} : Ready", getIdentifier()); - mutableTree.seal(); - return store.submit(this); - } + private final DOMStoreTransactionChainImpl txChain; - protected MutableDataTree getMutatedView() { - return mutableTree; + protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction, + final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) { + super(); + this.transaction = transaction; + this.delegate = delegate; + this.txChain = txChain; } @Override - public String toString() { - return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]"; + public ListenableFuture canCommit() { + return delegate.canCommit(); } - } - - private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements - DOMStoreReadWriteTransaction { - - protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot, - final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) { - super(identifier, snapshot, store, applyOper); + @Override + public ListenableFuture preCommit() { + return delegate.preCommit(); } @Override - public ListenableFuture>> read(final InstanceIdentifier path) { - return Futures.immediateFuture(getMutatedView().read(path)); + public ListenableFuture abort() { + return delegate.abort(); } @Override - public String toString() { - return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]"; + public ListenableFuture commit() { + ListenableFuture commitFuture = delegate.commit(); + Futures.addCallback(commitFuture, new FutureCallback() { + @Override + public void onFailure(final Throwable t) { + txChain.onTransactionFailed(transaction, t); + } + + @Override + public void onSuccess(final Void result) { + txChain.onTransactionCommited(transaction); + } + + }); + return commitFuture; } } private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort { - private final SnaphostBackedWriteTransaction transaction; - private final NodeModification modification; + private final SnapshotBackedWriteTransaction transaction; + private final DataTreeModification modification; - private DataAndMetadataSnapshot storeSnapshot; - private Optional proposedSubtree; - private Iterable listenerTasks; + private ResolveDataChangeEventsTask listenerResolver; + private DataTreeCandidate candidate; - public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) { + public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) { this.transaction = writeTransaction; - this.modification = transaction.getMutatedView().getRootModification(); + this.modification = transaction.getMutatedView(); } @Override public ListenableFuture canCommit() { - final DataAndMetadataSnapshot snapshotCapture = snapshot; - final ModificationApplyOperation snapshotOperation = operationTree; - return executor.submit(new Callable() { - @Override - public Boolean call() throws Exception { - boolean applicable = snapshotOperation.isApplicable(modification, - Optional.of(snapshotCapture.getMetadataTree())); - LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable); - return applicable; + public Boolean call() { + try { + dataTree.validate(modification); + LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier()); + return true; + } catch (DataPreconditionFailedException e) { + LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(), + e.getPath(), e); + return false; + } } }); } @Override public ListenableFuture preCommit() { - storeSnapshot = snapshot; - if(modification.getModificationType() == ModificationType.UNMODIFIED) { - return Futures.immediateFuture(null); - } return executor.submit(new Callable() { - - - @Override - public Void call() throws Exception { - StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree(); - - proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree), - increase(metadataTree.getSubtreeVersion())); - - listenerTasks = DataChangeEventResolver.create() // - .setRootPath(PUBLIC_ROOT_PATH) // - .setBeforeRoot(Optional.of(metadataTree)) // - .setAfterRoot(proposedSubtree) // - .setModificationRoot(modification) // - .setListenerRoot(listenerTree) // - .resolve(); - + public Void call() { + candidate = dataTree.prepare(modification); + listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree); return null; } }); @@ -341,48 +330,28 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture abort() { - storeSnapshot = null; - proposedSubtree = null; - return Futures. immediateFuture(null); + candidate = null; + return Futures.immediateFuture(null); } @Override public ListenableFuture commit() { - if(modification.getModificationType() == ModificationType.UNMODIFIED) { - return Futures.immediateFuture(null); + checkState(candidate != null, "Proposed subtree must be computed"); + + /* + * The commit has to occur atomically with regard to listener + * registrations. + */ + synchronized (this) { + dataTree.commit(candidate); + + for (ChangeListenerNotifyTask task : listenerResolver.call()) { + LOG.trace("Scheduling invocation of listeners: {}", task); + executor.submit(task); + } } - checkState(proposedSubtree != null,"Proposed subtree must be computed"); - checkState(storeSnapshot != null,"Proposed subtree must be computed"); - // return ImmediateFuture<>; - InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks); - return Futures. immediateFuture(null); + return Futures.immediateFuture(null); } - - } - - private static final class AlwaysFailOperation implements ModificationApplyOperation { - - @Override - public Optional apply(final NodeModification modification, - final Optional storeMeta, final UnsignedLong subtreeVersion) { - throw new IllegalStateException("Schema Context is not available."); - } - - @Override - public boolean isApplicable(final NodeModification modification, final Optional storeMetadata) { - throw new IllegalStateException("Schema Context is not available."); - } - - @Override - public Optional getChild(final PathArgument child) { - throw new IllegalStateException("Schema Context is not available."); - } - - @Override - public void verifyStructure(final NodeModification modification) throws IllegalArgumentException { - throw new IllegalStateException("Schema Context is not available."); - } - } }