X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fstore%2Fimpl%2FInMemoryDOMDataStore.java;h=005e3b772dc1259356e11d6722cd1744963f23d1;hp=0cd00cad7e6ec9609a03bac044f1ac613edf32c2;hb=8022ea0892ef0499580685b12bc130f29ae8bbbc;hpb=08217531fbe76dbcc429c71d593894fc211e50aa diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 0cd00cad7e..005e3b772d 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -11,12 +11,14 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType; import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification; import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode; @@ -24,7 +26,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; @@ -36,7 +40,10 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Objects; +import com.google.common.base.Objects.ToStringHelper; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -47,45 +54,42 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class); private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build(); - private final ListeningExecutorService executor; private final String name; private final AtomicLong txCounter = new AtomicLong(0); + private final ListenerTree listenerTree; + private final AtomicReference snapshot; - private DataAndMetadataSnapshot snapshot; private ModificationApplyOperation operationTree; - private final ListenerRegistrationNode listenerTree; - - private SchemaContext schemaContext; public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) { - this.executor = executor; - this.name = name; - this.operationTree = new AllwaysFailOperation(); - this.snapshot = DataAndMetadataSnapshot.createEmpty(); - this.listenerTree = ListenerRegistrationNode.createRoot(); + this.name = Preconditions.checkNotNull(name); + this.executor = Preconditions.checkNotNull(executor); + this.listenerTree = ListenerTree.create(); + this.snapshot = new AtomicReference(DataAndMetadataSnapshot.createEmpty()); + this.operationTree = new AlwaysFailOperation(); } @Override - public String getIdentifier() { + public final String getIdentifier() { return name; } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot); + return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get()); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree); + return new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree); } @Override @@ -97,38 +101,44 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public >> ListenerRegistration registerChangeListener( final InstanceIdentifier path, final L listener, final DataChangeScope scope) { - LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); - ListenerRegistrationNode listenerNode = listenerTree; - for(PathArgument arg :path.getPath()) { - listenerNode = listenerNode.ensureChild(arg); - } - synchronized (listener) { - notifyInitialState(path, listener); - } - return listenerNode.registerDataChangeListener(path,listener, scope); - } - private void notifyInitialState(final InstanceIdentifier path, - final AsyncDataChangeListener> listener) { - Optional currentState = snapshot.read(path); - try { + /* + * Make sure commit is not occurring right now. Listener has to be + * registered and its state capture enqueued at a consistent point. + * + * FIXME: improve this to read-write lock, such that multiple listener + * registrations can occur simultaneously + */ + final DataChangeListenerRegistration reg; + synchronized (this) { + LOG.debug("{}: Registering data change listener {} for {}", name, listener, path); + + reg = listenerTree.registerDataChangeListener(path, listener, scope); + + Optional currentState = snapshot.get().read(path); if (currentState.isPresent()) { - NormalizedNode data = currentState.get().getData(); - listener.onDataChanged(DOMImmutableDataChangeEvent.builder() // + final NormalizedNode data = currentState.get().getData(); + + final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) // .setAfter(data) // .addCreated(path, data) // - .build() // - ); + .build(); + executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event)); } - } catch (Exception e) { - LOG.error("Unhandled exception encountered when invoking listener {}", listener, e); } + return new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + synchronized (InMemoryDOMDataStore.this) { + reg.close(); + } + } + }; } - private synchronized DOMStoreThreePhaseCommitCohort submit( - final SnaphostBackedWriteTransaction writeTx) { - LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView()); + private synchronized DOMStoreThreePhaseCommitCohort submit(final SnapshotBackedWriteTransaction writeTx) { + LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView()); return new ThreePhaseCommitImpl(writeTx); } @@ -136,44 +146,78 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch return name + "-" + txCounter.getAndIncrement(); } - private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot, - final StoreMetadataNode newDataTree, final Iterable listenerTasks) { - LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion()); + private void commit(final DataAndMetadataSnapshot currentSnapshot, final StoreMetadataNode newDataTree, + final ResolveDataChangeEventsTask listenerResolver) { + LOG.debug("Updating Store snaphot version: {} with version:{}", currentSnapshot.getMetadataTree() + .getSubtreeVersion(), newDataTree.getSubtreeVersion()); - if(LOG.isTraceEnabled()) { - LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree)); + if (LOG.isTraceEnabled()) { + LOG.trace("Data Tree is {}", StoreUtils.toStringTree(newDataTree)); } - checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs"); - snapshot = DataAndMetadataSnapshot.builder() // + + final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() // .setMetadataTree(newDataTree) // .setSchemaContext(schemaContext) // .build(); - for(ChangeListenerNotifyTask task : listenerTasks) { - executor.submit(task); + /* + * The commit has to occur atomically with regard to listener + * registrations. + */ + synchronized (this) { + final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot); + checkState(success, "Store snapshot and transaction snapshot differ. This should never happen."); + + for (ChangeListenerNotifyTask task : listenerResolver.call()) { + LOG.trace("Scheduling invocation of listeners: {}", task); + executor.submit(task); + } } - } - private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction { - - private DataAndMetadataSnapshot stableSnapshot; + private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction { private final Object identifier; - public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) { + protected AbstractDOMStoreTransaction(final Object identifier) { this.identifier = identifier; - this.stableSnapshot = snapshot; - LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); - } @Override - public Object getIdentifier() { + public final Object getIdentifier() { return identifier; } + @Override + public final String toString() { + return addToStringAttributes(Objects.toStringHelper(this)).toString(); + } + + /** + * Add class-specific toString attributes. + * + * @param toStringHelper + * ToStringHelper instance + * @return ToStringHelper instance which was passed in + */ + protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return toStringHelper.add("id", identifier); + } + } + + private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements + DOMStoreReadTransaction { + private DataAndMetadataSnapshot stableSnapshot; + + public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) { + super(identifier); + this.stableSnapshot = Preconditions.checkNotNull(snapshot); + LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree() + .getSubtreeVersion()); + } + @Override public void close() { + LOG.debug("Store transaction: {} : Closed", getIdentifier()); stableSnapshot = null; } @@ -183,37 +227,26 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch checkState(stableSnapshot != null, "Transaction is closed"); return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path)); } - - @Override - public String toString() { - return "SnapshotBackedReadTransaction [id =" + identifier + "]"; - } - } - private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction { - + private static class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements + DOMStoreWriteTransaction { private MutableDataTree mutableTree; - private final Object identifier; private InMemoryDOMDataStore store; - private boolean ready = false; - public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot, + public SnapshotBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot, final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) { - this.identifier = identifier; + super(identifier); mutableTree = MutableDataTree.from(snapshot, applyOper); this.store = store; - LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion()); - } - - @Override - public Object getIdentifier() { - return identifier; + LOG.debug("Write Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree() + .getSubtreeVersion()); } @Override public void close() { + LOG.debug("Store transaction: {} : Closed", getIdentifier()); this.mutableTree = null; this.store = null; } @@ -221,26 +254,52 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public void write(final InstanceIdentifier path, final NormalizedNode data) { checkNotReady(); - mutableTree.write(path, data); + try { + LOG.trace("Tx: {} Write: {}:{}", getIdentifier(), path, data); + mutableTree.write(path, data); + // FIXME: Add checked exception + } catch (Exception e) { + LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e); + } + } + + @Override + public void merge(final InstanceIdentifier path, final NormalizedNode data) { + checkNotReady(); + try { + LOG.trace("Tx: {} Merge: {}:{}", getIdentifier(), path, data); + mutableTree.merge(path, data); + // FIXME: Add checked exception + } catch (Exception e) { + LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e); + } } @Override public void delete(final InstanceIdentifier path) { checkNotReady(); - mutableTree.delete(path); + try { + LOG.trace("Tx: {} Delete: {}", getIdentifier(), path); + mutableTree.delete(path); + // FIXME: Add checked exception + } catch (Exception e) { + LOG.error("Tx: {}, failed to delete {} in {}", getIdentifier(), path, mutableTree, e); + } } - protected boolean isReady() { + protected final boolean isReady() { return ready; } - protected void checkNotReady() { - checkState(!ready, "Transaction is ready. No further modifications allowed."); + protected final void checkNotReady() { + checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier()); } @Override public synchronized DOMStoreThreePhaseCommitCohort ready() { + checkState(!ready, "Transaction %s is already ready.", getIdentifier()); ready = true; + LOG.debug("Store transaction: {} : Ready", getIdentifier()); mutableTree.seal(); return store.submit(this); @@ -251,13 +310,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch } @Override - public String toString() { - return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]"; + protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return toStringHelper.add("ready", isReady()); } - } - private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements + private static class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements DOMStoreReadWriteTransaction { protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot, @@ -267,42 +325,49 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture>> read(final InstanceIdentifier path) { - return Futures.immediateFuture(getMutatedView().read(path)); - } - - @Override - public String toString() { - return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]"; + LOG.trace("Tx: {} Read: {}", getIdentifier(), path); + try { + return Futures.immediateFuture(getMutatedView().read(path)); + } catch (Exception e) { + LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e); + throw e; + } } - } private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort { - private final SnaphostBackedWriteTransaction transaction; + private final SnapshotBackedWriteTransaction transaction; private final NodeModification modification; private DataAndMetadataSnapshot storeSnapshot; private Optional proposedSubtree; - private Iterable listenerTasks; + private ResolveDataChangeEventsTask listenerResolver; - public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) { + public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) { this.transaction = writeTransaction; this.modification = transaction.getMutatedView().getRootModification(); } @Override public ListenableFuture canCommit() { - final DataAndMetadataSnapshot snapshotCapture = snapshot; + final DataAndMetadataSnapshot snapshotCapture = snapshot.get(); final ModificationApplyOperation snapshotOperation = operationTree; return executor.submit(new Callable() { @Override public Boolean call() throws Exception { - boolean applicable = snapshotOperation.isApplicable(modification, + Boolean applicable = false; + try { + snapshotOperation.checkApplicable(PUBLIC_ROOT_PATH, modification, Optional.of(snapshotCapture.getMetadataTree())); - LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable); + applicable = true; + } catch (DataPreconditionFailedException e) { + LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e); + applicable = false; + } + LOG.debug("Store Transaction: {} : canCommit : {}", transaction.getIdentifier(), applicable); return applicable; } }); @@ -310,14 +375,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture preCommit() { - storeSnapshot = snapshot; - if(modification.getModificationType() == ModificationType.UNMODIFIED) { + storeSnapshot = snapshot.get(); + if (modification.getModificationType() == ModificationType.UNMODIFIED) { return Futures.immediateFuture(null); } return executor.submit(new Callable() { - - @Override public Void call() throws Exception { StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree(); @@ -325,13 +388,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree), increase(metadataTree.getSubtreeVersion())); - listenerTasks = DataChangeEventResolver.create() // + listenerResolver = ResolveDataChangeEventsTask.create() // .setRootPath(PUBLIC_ROOT_PATH) // .setBeforeRoot(Optional.of(metadataTree)) // .setAfterRoot(proposedSubtree) // .setModificationRoot(modification) // - .setListenerRoot(listenerTree) // - .resolve(); + .setListenerRoot(listenerTree); return null; } @@ -347,20 +409,20 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture commit() { - if(modification.getModificationType() == ModificationType.UNMODIFIED) { + if (modification.getModificationType() == ModificationType.UNMODIFIED) { return Futures.immediateFuture(null); } - checkState(proposedSubtree != null,"Proposed subtree must be computed"); - checkState(storeSnapshot != null,"Proposed subtree must be computed"); + checkState(proposedSubtree != null, "Proposed subtree must be computed"); + checkState(storeSnapshot != null, "Proposed subtree must be computed"); // return ImmediateFuture<>; - InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks); + InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(), listenerResolver); return Futures. immediateFuture(null); } } - private class AllwaysFailOperation implements ModificationApplyOperation { + private static final class AlwaysFailOperation implements ModificationApplyOperation { @Override public Optional apply(final NodeModification modification, @@ -369,7 +431,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch } @Override - public boolean isApplicable(final NodeModification modification, final Optional storeMetadata) { + public void checkApplicable(final InstanceIdentifier path,final NodeModification modification, final Optional storeMetadata) { throw new IllegalStateException("Schema Context is not available."); }