X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-inmemory-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fstore%2Finmemory%2FInmemoryDOMDataTreeShardWriteTransaction.java;h=d3265fdab3716d05d95bad6f62e43c51ca547459;hb=dca009bba2d4ceb2e13537f3ac6f9a5f1b05302f;hp=09e07b52776d1d54db367779cc7ed0caa817721d;hpb=a222d4e5d6928065adf09a16c112c73221b8403f;p=mdsal.git diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java index 09e07b5277..d3265fdab3 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java @@ -13,16 +13,15 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Iterator; import java.util.Map.Entry; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -31,7 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction { +class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable { private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class); @@ -63,9 +62,9 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path, final NormalizedNode data) { int enterCount = 0; - Iterator it = path.getPathArguments().iterator(); + final Iterator it = path.getPathArguments().iterator(); while (it.hasNext()) { - PathArgument currentArg = it.next(); + final PathArgument currentArg = it.next(); if (it.hasNext()) { // We need to enter one level deeper, we are not at leaf (modified) node cursor.enter(currentArg); @@ -78,34 +77,48 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT } } - private final ShardDataModification modification; - private DOMDataTreeWriteCursor cursor; - private DataTree rootShardDataTree; - private DataTreeModification rootModification = null; + private static final AtomicLong COUNTER = new AtomicLong(); - private ArrayList cohorts = new ArrayList<>(); - private InMemoryDOMDataTreeShardChangePublisher changePublisher; + private final ArrayList cohorts = new ArrayList<>(); + private final InMemoryDOMDataTreeShardChangePublisher changePublisher; + private final InMemoryDOMDataTreeShardProducer producer; + private final ShardDataModification modification; + private final ListeningExecutorService executor; + private final DataTree rootShardDataTree; + private final String identifier; - // FIXME inject into shard? - private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + private DataTreeModification rootModification = null; + private DOMDataTreeWriteCursor cursor; + private boolean finished = false; - InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, + InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer, + final ShardDataModification root, final DataTree rootShardDataTree, - final InMemoryDOMDataTreeShardChangePublisher changePublisher) { + final InMemoryDOMDataTreeShardChangePublisher changePublisher, + final ListeningExecutorService executor) { + this.producer = producer; this.modification = Preconditions.checkNotNull(root); this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree); this.changePublisher = Preconditions.checkNotNull(changePublisher); + this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement(); + LOG.debug("Shard transaction{} created", identifier); + this.executor = executor; + } + + @Override + public String getIdentifier() { + return identifier; } private DOMDataTreeWriteCursor getCursor() { if (cursor == null) { - cursor = new ShardDataModificationCursor(modification); + cursor = new ShardDataModificationCursor(modification, this); } return cursor; } void delete(final YangInstanceIdentifier path) { - YangInstanceIdentifier relativePath = toRelative(path); + final YangInstanceIdentifier relativePath = toRelative(path); Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath), "Deletion of shard root is not allowed"); SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null); @@ -120,42 +133,58 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT } private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) { - Optional relative = + final Optional relative = path.relativeTo(modification.getPrefix().getRootIdentifier()); Preconditions.checkArgument(relative.isPresent()); return relative.get(); } public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - // FIXME: Implement this - return null; + throw new UnsupportedOperationException("Not implemented yet"); } public CheckedFuture exists(final YangInstanceIdentifier path) { - // TODO Auto-generated method stub - return null; + throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public void close() { + Preconditions.checkState(!finished, "Attempting to close an already finished transaction."); + modification.closeTransactions(); + if (cursor != null) { + cursor.close(); + } + producer.transactionAborted(this); + finished = true; + } - public Object getIdentifier() { - // TODO Auto-generated method stub - return null; + void cursorClosed() { + Preconditions.checkNotNull(cursor); + modification.closeCursor(); + cursor = null; } - public void close() { - // TODO Auto-generated method stub + public boolean isFinished() { + return finished; } @Override public void ready() { + Preconditions.checkState(!finished, "Attempting to ready an already finished transaction."); + Preconditions.checkState(cursor == null, "Attempting to ready a transaction that has an open cursor."); + Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction."); LOG.debug("Readying open transaction on shard {}", modification.getPrefix()); rootModification = modification.seal(); - cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher)); - for (Entry entry : modification.getChildShards().entrySet()) { + producer.transactionReady(this, rootModification); + cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort( + rootShardDataTree, rootModification, changePublisher)); + for (final Entry entry : + modification.getChildShards().entrySet()) { cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue())); } + finished = true; } @Override @@ -163,9 +192,10 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT LOG.debug("Submitting open transaction on shard {}", modification.getPrefix()); Preconditions.checkNotNull(cohorts); - Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction"); + Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet."); - final ListenableFuture submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts)); + final ListenableFuture submit = executor.submit(new ShardSubmitCoordinationTask( + modification.getPrefix(), cohorts, this)); return submit; } @@ -174,7 +204,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT public ListenableFuture validate() { LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix()); - final ListenableFuture submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts)); + final ListenableFuture submit = executor.submit(new ShardCanCommitCoordinationTask( + modification.getPrefix(), cohorts)); return submit; } @@ -182,7 +213,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT public ListenableFuture prepare() { LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix()); - final ListenableFuture submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts)); + final ListenableFuture submit = executor.submit(new ShardPreCommitCoordinationTask( + modification.getPrefix(), cohorts)); return submit; } @@ -190,18 +222,26 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT public ListenableFuture commit() { LOG.debug("Commit open transaction on shard {}", modification.getPrefix()); - final ListenableFuture submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts)); + final ListenableFuture submit = executor.submit(new ShardCommitCoordinationTask( + modification.getPrefix(), cohorts, this)); return submit; } - public void followUp() { + DataTreeModification getRootModification() { + Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet"); + return rootModification; + } + void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) { + producer.onTransactionCommited(tx); } @Override public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) { - DOMDataTreeWriteCursor ret = getCursor(); - YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier()); + Preconditions.checkState(!finished, "Transaction is finished/closed already."); + Preconditions.checkState(cursor == null, "Previous cursor wasn't closed"); + final DOMDataTreeWriteCursor ret = getCursor(); + final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier()); ret.enter(relativePath.getPathArguments()); return ret; }