X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FShardedDOMDataTreeProducer.java;h=b7c2e58e523a653a1135bf5be87ceb9f36b44c16;hb=c182e1d32720044c42a187725e5effc57313b31d;hp=c19f87af0802bdfe1656bca23d67eaec2e6d807c;hpb=7f6484ef9fe7698bc235ee597354ac54c0cc29f1;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java index c19f87af08..b7c2e58e52 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java @@ -8,226 +8,275 @@ package org.opendaylight.mdsal.dom.broker; import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.ImmutableBiMap; -import com.google.common.collect.ImmutableBiMap.Builder; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Verify; import com.google.common.collect.ImmutableSet; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; import java.util.Set; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerBusyException; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; -import org.opendaylight.mdsal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.mdsal.dom.spi.store.DOMStore; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { +class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeProducer.class); - private final BiMap shardToChain; - private final Map idToShard; + + private final Set subtrees; private final ShardedDOMDataTree dataTree; - @GuardedBy("this") - private Map children = Collections.emptyMap(); - @GuardedBy("this") - private DOMDataWriteTransaction openTx; - @GuardedBy("this") - private boolean closed; + private static final AtomicReferenceFieldUpdater + CURRENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, + ShardedDOMDataTreeWriteTransaction.class, "currentTx"); + private volatile ShardedDOMDataTreeWriteTransaction currentTx; - @GuardedBy("this") - private ShardedDOMDataTreeListenerContext attachedListener; + private static final AtomicReferenceFieldUpdater + OPEN_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, + ShardedDOMDataTreeWriteTransaction.class, "openTx"); + private volatile ShardedDOMDataTreeWriteTransaction openTx; - ShardedDOMDataTreeProducer(final ShardedDOMDataTree dataTree, final Map shardMap, final Set shards) { - this.dataTree = Preconditions.checkNotNull(dataTree); + private static final AtomicReferenceFieldUpdater + LAST_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, + ShardedDOMDataTreeWriteTransaction.class, "lastTx"); + private volatile ShardedDOMDataTreeWriteTransaction lastTx; - // Create shard -> chain map - final Builder cb = ImmutableBiMap.builder(); - final Queue es = new LinkedList<>(); - - for (final DOMDataTreeShard s : shards) { - if (s instanceof DOMStore) { - try { - final DOMStoreTransactionChain c = ((DOMStore)s).createTransactionChain(); - LOG.trace("Using DOMStore chain {} to access shard {}", c, s); - cb.put(s, c); - } catch (final Exception e) { - LOG.error("Failed to instantiate chain for shard {}", s, e); - es.add(e); - } - } else { - LOG.error("Unhandled shard instance type {}", s.getClass()); - } - } - this.shardToChain = cb.build(); - - // An error was encountered, close chains and report the error - if (shardToChain.size() != shards.size()) { - for (final DOMStoreTransactionChain c : shardToChain.values()) { - try { - c.close(); - } catch (final Exception e) { - LOG.warn("Exception raised while closing chain {}", c, e); - } - } + private static final AtomicIntegerFieldUpdater CLOSED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, "closed"); + private volatile int closed; - final IllegalStateException e = new IllegalStateException("Failed to completely allocate contexts", es.poll()); - while (!es.isEmpty()) { - e.addSuppressed(es.poll()); - } + private volatile ShardedDOMDataTreeListenerContext attachedListener; + private volatile ProducerLayout layout; - throw e; - } + private ShardedDOMDataTreeProducer(final ShardedDOMDataTree dataTree, + final Collection subtrees, + final Map shardMap) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.subtrees = ImmutableSet.copyOf(subtrees); + this.layout = ProducerLayout.create(shardMap); + } - idToShard = ImmutableMap.copyOf(shardMap); + static DOMDataTreeProducer create(final ShardedDOMDataTree dataTree, + final Collection subtrees, + final Map shardMap) { + return new ShardedDOMDataTreeProducer(dataTree, subtrees, shardMap); } - @Override - public synchronized DOMDataWriteTransaction createTransaction(final boolean isolated) { - Preconditions.checkState(!closed, "Producer is already closed"); + private void checkNotClosed() { + Preconditions.checkState(closed == 0, "Producer is already closed"); + } + + private void checkIdle() { Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx); + } - // Allocate backing transactions - final Map shardToTx = new HashMap<>(); - for (final Entry e : shardToChain.entrySet()) { - shardToTx.put(e.getKey(), e.getValue().newWriteOnlyTransaction()); - } + void subshardAdded(final Map shardMap) { + checkIdle(); + + layout = layout.reshard(shardMap); + } - // Create the ID->transaction map - final ImmutableMap.Builder b = ImmutableMap.builder(); - for (final Entry e : idToShard.entrySet()) { - b.put(e.getKey(), shardToTx.get(e.getValue())); + @Override + public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) { + checkNotClosed(); + checkIdle(); + + LOG.debug("Creating transaction from producer {}", this); + + final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null); + final ShardedDOMDataTreeWriteTransaction ret; + if (isolated) { + ret = createIsolatedTransaction(layout, current); + } else { + ret = createReusedTransaction(layout, current); } - final ShardedDOMDataWriteTransaction ret = new ShardedDOMDataWriteTransaction(this, b.build()); - openTx = ret; + final boolean success = OPEN_UPDATER.compareAndSet(this, null, ret); + Preconditions.checkState(success, "Illegal concurrent access to producer %s detected", this); return ret; } + // This may look weird, but this has side-effects on local's producers, hence it needs to be properly synchronized + // so that it happens-after submitTransaction() which may have been stolen by a callback. @GuardedBy("this") - private boolean haveSubtree(final DOMDataTreeIdentifier subtree) { - for (final DOMDataTreeIdentifier i : idToShard.keySet()) { - if (i.contains(subtree)) { - return true; - } + private ShardedDOMDataTreeWriteTransaction createTransaction(final ProducerLayout local) { + return new ShardedDOMDataTreeWriteTransaction(this, local.createTransactions(), local); + + } + + // Isolated case. If we have a previous transaction, submit it before returning this one. + private synchronized ShardedDOMDataTreeWriteTransaction createIsolatedTransaction( + final ProducerLayout local, final ShardedDOMDataTreeWriteTransaction current) { + if (current != null) { + submitTransaction(current); } - return false; + return createTransaction(local); } - @GuardedBy("this") - private DOMDataTreeProducer lookupChild(final DOMDataTreeIdentifier s) { - for (final Entry e : children.entrySet()) { - if (e.getKey().contains(s)) { - return e.getValue(); + private ShardedDOMDataTreeWriteTransaction createReusedTransaction(final ProducerLayout local, + final ShardedDOMDataTreeWriteTransaction current) { + if (current != null) { + // Lock-free fast path + if (local.equals(current.getLayout())) { + LOG.debug("Reusing previous transaction {} since there is still a transaction inflight", + current.getIdentifier()); + return current; + } + + synchronized (this) { + submitTransaction(current); + return createTransaction(local); } } - return null; + // Null indicates we have not seen a previous transaction -- which does not mean it is ready, as it may have + // been stolen and in is process of being submitted. + synchronized (this) { + return createTransaction(local); + } + } + + @GuardedBy("this") + private void submitTransaction(final ShardedDOMDataTreeWriteTransaction tx) { + lastTx = tx; + tx.doSubmit(this::transactionSuccessful, this::transactionFailed); } @Override - public synchronized DOMDataTreeProducer createProducer(final Collection subtrees) { - Preconditions.checkState(!closed, "Producer is already closed"); - Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx); + public DOMDataTreeProducer createProducer(final Collection subtrees) { + checkNotClosed(); + checkIdle(); + + final ProducerLayout local = layout; for (final DOMDataTreeIdentifier s : subtrees) { // Check if the subtree was visible at any time - Preconditions.checkArgument(haveSubtree(s), "Subtree %s was never available in producer %s", s, this); + Preconditions.checkArgument(local.haveSubtree(s), "Subtree %s was never available in producer %s", s, this); // Check if the subtree has not been delegated to a child - final DOMDataTreeProducer child = lookupChild(s); + final DOMDataTreeProducer child = local.lookupChild(s); Preconditions.checkArgument(child == null, "Subtree %s is delegated to child producer %s", s, child); // Check if part of the requested subtree is not delegated to a child. - for (final DOMDataTreeIdentifier c : children.keySet()) { - if (s.contains(c)) { - throw new IllegalArgumentException(String.format("Subtree %s cannot be delegated as it is superset of already-delegated %s", s, c)); - } + for (final DOMDataTreeIdentifier c : local.getChildTrees()) { + Preconditions.checkArgument(!s.contains(c), + "Subtree %s cannot be delegated as it is a superset of already-delegated %s", s, c); } } - final DOMDataTreeProducer ret = dataTree.createProducer(this, subtrees); - final ImmutableMap.Builder cb = ImmutableMap.builder(); - cb.putAll(children); - for (final DOMDataTreeIdentifier s : subtrees) { - cb.put(s, ret); + + final DOMDataTreeProducer ret; + synchronized (this) { + ret = dataTree.createProducer(this, subtrees); } - children = cb.build(); + layout = local.addChild(ret, subtrees); return ret; } - boolean isDelegatedToChild(DOMDataTreeIdentifier path) { - for (final DOMDataTreeIdentifier c : children.keySet()) { - if (c.contains(path)) { - return true; + boolean isDelegatedToChild(final DOMDataTreeIdentifier path) { + return layout.lookupChild(path) != null; + } + + @Override + public void close() throws DOMDataTreeProducerException { + if (openTx != null) { + throw new DOMDataTreeProducerBusyException(String.format("Transaction %s is still open", openTx)); + } + + if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) { + synchronized (this) { + dataTree.destroyProducer(this); } } - return false; } + protected Set getSubtrees() { + return subtrees; + } - @Override - public synchronized void close() throws DOMDataTreeProducerException { - if (!closed) { - if (openTx != null) { - throw new DOMDataTreeProducerBusyException(String.format("Transaction %s is still open", openTx)); + void cancelTransaction(final ShardedDOMDataTreeWriteTransaction transaction) { + final boolean success = OPEN_UPDATER.compareAndSet(this, transaction, null); + if (success) { + LOG.debug("Transaction {} cancelled", transaction); + } else { + LOG.warn("Transaction {} is not open in producer {}", transaction, this); + } + } + + // Called when the user submits a transaction + void transactionSubmitted(final ShardedDOMDataTreeWriteTransaction transaction) { + final boolean wasOpen = OPEN_UPDATER.compareAndSet(this, transaction, null); + Preconditions.checkState(wasOpen, "Attempted to submit non-open transaction %s", transaction); + + if (lastTx == null) { + // No transaction outstanding, we need to submit it now + synchronized (this) { + submitTransaction(transaction); } - closed = true; - dataTree.destroyProducer(this); + return; } - } - static DOMDataTreeProducer create(final ShardedDOMDataTree dataTree, final Map shardMap) { - /* - * FIXME: we do not allow multiple multiple shards in a producer because we do not implement the - * synchronization primitives yet - */ - final Set shards = ImmutableSet.copyOf(shardMap.values()); - if (shards.size() > 1) { - throw new UnsupportedOperationException("Cross-shard producers are not supported yet"); + // There is a potentially-racing submitted transaction. Publish the new one, which may end up being + // picked up by processNextTransaction. + final boolean success = CURRENT_UPDATER.compareAndSet(this, null, transaction); + Verify.verify(success); + + // Now a quick check: if the racing transaction completed in between, it may have missed the current + // transaction, hence we need to re-check + if (lastTx == null) { + submitCurrentTransaction(); } + } - return new ShardedDOMDataTreeProducer(dataTree, shardMap, shards); + private void submitCurrentTransaction() { + final ShardedDOMDataTreeWriteTransaction current = currentTx; + if (current != null) { + synchronized (this) { + if (CURRENT_UPDATER.compareAndSet(this, current, null)) { + submitTransaction(current); + } + } + } } - Set getSubtrees() { - return idToShard.keySet(); + private void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx) { + LOG.debug("Transaction {} completed successfully", tx.getIdentifier()); + + tx.onTransactionSuccess(null); + transactionCompleted(tx); } - synchronized void cancelTransaction(final ShardedDOMDataWriteTransaction transaction) { - if (!openTx.equals(transaction)) { - LOG.warn("Transaction {} is not open in producer {}", transaction, this); - return; - } + private void transactionFailed(final ShardedDOMDataTreeWriteTransaction tx, final Throwable throwable) { + LOG.debug("Transaction {} failed", tx.getIdentifier(), throwable); - LOG.debug("Transaction {} cancelled", transaction); - openTx = null; + tx.onTransactionFailure(throwable); + // FIXME: transaction failure should result in a hard error + transactionCompleted(tx); } - synchronized void transactionSubmitted(ShardedDOMDataWriteTransaction transaction) { - Preconditions.checkState(openTx.equals(transaction)); - openTx = null; + private void transactionCompleted(final ShardedDOMDataTreeWriteTransaction tx) { + final boolean wasLast = LAST_UPDATER.compareAndSet(this, tx, null); + if (wasLast) { + submitCurrentTransaction(); + } } - synchronized void boundToListener(ShardedDOMDataTreeListenerContext listener) { - // FIXME: Add option to dettach - Preconditions.checkState(this.attachedListener == null, - "Producer %s is already attached to other listener.", - listener.getListener()); + void bindToListener(final ShardedDOMDataTreeListenerContext listener) { + Preconditions.checkNotNull(listener); + + final ShardedDOMDataTreeListenerContext local = attachedListener; + if (local != null) { + throw new IllegalStateException(String.format("Producer %s is already attached to listener %s", this, + local.getListener())); + } + this.attachedListener = listener; } }