X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-inmemory-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fstore%2Finmemory%2FInmemoryDOMDataTreeShardWriteTransaction.java;h=3472d9c69f5956b73aa19add0dc34931fe0da874;hb=819aef314ba0fc931ca46c0f39d4f70ff49d3540;hp=95b28bdeb6c7f31bfe07b653841ecdb1f14601a8;hpb=f58e2f0995a0b4c98820fba640e82cb87c0fefa6;p=mdsal.git diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java index 95b28bdeb6..3472d9c69f 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java @@ -8,21 +8,22 @@ package org.opendaylight.mdsal.dom.store.inmemory; -import com.google.common.base.Optional; +import static java.util.Objects.requireNonNull; + import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Iterator; import java.util.Map.Entry; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import org.opendaylight.mdsal.common.api.ReadFailedException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction; +import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -31,85 +32,102 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction { +class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable { private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class); private enum SimpleCursorOperation { MERGE { @Override - void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child, - final NormalizedNode data) { - cursor.merge(child, data); + void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child, + final NormalizedNode data) { + cur.merge(child, data); } }, DELETE { @Override - void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child, - final NormalizedNode data) { - cursor.delete(child); + void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child, + final NormalizedNode data) { + cur.delete(child); } }, WRITE { @Override - void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child, - final NormalizedNode data) { - cursor.write(child, data); + void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child, + final NormalizedNode data) { + cur.write(child, data); } }; - abstract void applyOnLeaf(DOMDataTreeWriteCursor cursor, PathArgument child, NormalizedNode data); + abstract void applyOnLeaf(DOMDataTreeWriteCursor cur, PathArgument child, NormalizedNode data); - void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path, - final NormalizedNode data) { + void apply(final DOMDataTreeWriteCursor cur, final YangInstanceIdentifier path, + final NormalizedNode data) { int enterCount = 0; - Iterator it = path.getPathArguments().iterator(); - while (it.hasNext()) { - PathArgument currentArg = it.next(); - if (it.hasNext()) { + final Iterator it = path.getPathArguments().iterator(); + if (it.hasNext()) { + while (true) { + final PathArgument currentArg = it.next(); + if (!it.hasNext()) { + applyOnLeaf(cur, currentArg, data); + break; + } + // We need to enter one level deeper, we are not at leaf (modified) node - cursor.enter(currentArg); + cur.enter(currentArg); enterCount++; - } else { - applyOnLeaf(cursor, currentArg, data); } } - cursor.exit(enterCount); + + cur.exit(enterCount); } } + private static final AtomicLong COUNTER = new AtomicLong(); + + private final ArrayList cohorts = new ArrayList<>(); + private final InMemoryDOMDataTreeShardChangePublisher changePublisher; + private final InMemoryDOMDataTreeShardProducer producer; private final ShardDataModification modification; - private DOMDataTreeWriteCursor cursor; - private DataTree rootShardDataTree; - private DataTreeModification rootModification = null; + private final ListeningExecutorService executor; + private final DataTree rootShardDataTree; + private final String identifier; - private ArrayList cohorts = new ArrayList<>(); - private InMemoryDOMDataTreeShardChangePublisher changePublisher; + private DataTreeModification rootModification = null; + private DOMDataTreeWriteCursor cursor; private boolean finished = false; - // FIXME inject into shard? - private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - - InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, + InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer, + final ShardDataModification root, final DataTree rootShardDataTree, - final InMemoryDOMDataTreeShardChangePublisher changePublisher) { - this.modification = Preconditions.checkNotNull(root); - this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree); - this.changePublisher = Preconditions.checkNotNull(changePublisher); + final InMemoryDOMDataTreeShardChangePublisher changePublisher, + final ListeningExecutorService executor) { + this.producer = producer; + this.modification = requireNonNull(root); + this.rootShardDataTree = requireNonNull(rootShardDataTree); + this.changePublisher = requireNonNull(changePublisher); + this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement(); + LOG.debug("Shard transaction{} created", identifier); + this.executor = executor; + } + + @Override + public String getIdentifier() { + return identifier; } private DOMDataTreeWriteCursor getCursor() { if (cursor == null) { - cursor = new ShardDataModificationCursor(modification, this); + cursor = new InMemoryShardDataModificationCursor(modification, this); } return cursor; } void delete(final YangInstanceIdentifier path) { - YangInstanceIdentifier relativePath = toRelative(path); + final YangInstanceIdentifier relativePath = toRelative(path); Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath), "Deletion of shard root is not allowed"); - SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null); + SimpleCursorOperation.DELETE.apply(getCursor(), relativePath, null); } void merge(final YangInstanceIdentifier path, final NormalizedNode data) { @@ -117,42 +135,30 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT } void write(final YangInstanceIdentifier path, final NormalizedNode data) { - SimpleCursorOperation.DELETE.apply(getCursor(), toRelative(path), data); + SimpleCursorOperation.WRITE.apply(getCursor(), toRelative(path), data); } private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) { - Optional relative = + final Optional relative = path.relativeTo(modification.getPrefix().getRootIdentifier()); Preconditions.checkArgument(relative.isPresent()); return relative.get(); } - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - // FIXME: Implement this - return null; - } - - public CheckedFuture exists(final YangInstanceIdentifier path) { - // TODO Auto-generated method stub - return null; - } - - - public Object getIdentifier() { - // TODO Auto-generated method stub - return null; - } - @Override public void close() { Preconditions.checkState(!finished, "Attempting to close an already finished transaction."); modification.closeTransactions(); - cursor.close(); + if (cursor != null) { + cursor.close(); + } + producer.transactionAborted(this); finished = true; } void cursorClosed() { - Preconditions.checkNotNull(cursor); + requireNonNull(cursor); + modification.closeCursor(); cursor = null; } @@ -164,13 +170,16 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT public void ready() { Preconditions.checkState(!finished, "Attempting to ready an already finished transaction."); Preconditions.checkState(cursor == null, "Attempting to ready a transaction that has an open cursor."); - Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction."); + requireNonNull(modification, "Attempting to ready an empty transaction."); LOG.debug("Readying open transaction on shard {}", modification.getPrefix()); rootModification = modification.seal(); - cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher)); - for (Entry entry : modification.getChildShards().entrySet()) { + producer.transactionReady(this, rootModification); + cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort( + rootShardDataTree, rootModification, changePublisher)); + for (final Entry entry : + modification.getChildShards().entrySet()) { cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue())); } finished = true; @@ -180,48 +189,45 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT public ListenableFuture submit() { LOG.debug("Submitting open transaction on shard {}", modification.getPrefix()); - Preconditions.checkNotNull(cohorts); + requireNonNull(cohorts); Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet."); - final ListenableFuture submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts)); - - return submit; + return executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts, this)); } @Override public ListenableFuture validate() { LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix()); - - final ListenableFuture submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts)); - return submit; + return executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts)); } @Override public ListenableFuture prepare() { LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix()); - - final ListenableFuture submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts)); - return submit; + return executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts)); } @Override public ListenableFuture commit() { LOG.debug("Commit open transaction on shard {}", modification.getPrefix()); - - final ListenableFuture submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts)); - return submit; + return executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts, this)); } - public void followUp() { + DataTreeModification getRootModification() { + requireNonNull(rootModification, "Transaction wasn't sealed yet"); + return rootModification; + } + void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) { + producer.onTransactionCommited(tx); } @Override public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) { Preconditions.checkState(!finished, "Transaction is finished/closed already."); Preconditions.checkState(cursor == null, "Previous cursor wasn't closed"); - DOMDataTreeWriteCursor ret = getCursor(); - YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier()); + final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier()); + final DOMDataTreeWriteCursor ret = getCursor(); ret.enter(relativePath.getPathArguments()); return ret; }