X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FShardedDOMDataTreeProducer.java;h=022215a6b2fc90645dfba4b232d90b203f9117d9;hb=5f693add15c8702d72e0018ef2d30af076a5e537;hp=837871c03156addd19e4d5dac364fe38ff794ac8;hpb=395348596fcc6296e1a9ed0d9899b5aa16f08625;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java index 837871c031..022215a6b2 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java @@ -11,16 +11,17 @@ import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.BiMap; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import java.util.Collection; -import java.util.Collections; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; @@ -31,6 +32,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardProducer; import org.opendaylight.mdsal.dom.store.inmemory.WriteableDOMDataTreeShard; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +48,6 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { private static final AtomicReferenceFieldUpdater CURRENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, ShardedDOMDataTreeWriteTransaction.class, "currentTx"); - @SuppressWarnings("unused") private volatile ShardedDOMDataTreeWriteTransaction currentTx; private static final AtomicReferenceFieldUpdater @@ -59,10 +60,14 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { ShardedDOMDataTreeWriteTransaction.class, "lastTx"); private volatile ShardedDOMDataTreeWriteTransaction lastTx; + private static final AtomicIntegerFieldUpdater CLOSED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, "closed"); + private volatile int closed; + @GuardedBy("this") - private Map children = Collections.emptyMap(); + private Map children = ImmutableMap.of(); @GuardedBy("this") - private boolean closed; + private Set childRoots = ImmutableSet.of(); @GuardedBy("this") private ShardedDOMDataTreeListenerContext attachedListener; @@ -91,17 +96,6 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { return new ShardedDOMDataTreeProducer(dataTree, subtrees, shardMap, shardToIdentifiers); } - void subshardAdded(final Map shardMap) { - Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx); - final Multimap shardToIdentifiers = ArrayListMultimap.create(); - // map which identifier belongs to which shard - for (final Entry entry : shardMap.entrySet()) { - shardToIdentifiers.put(entry.getValue(), entry.getKey()); - } - this.idToProducer = mapIdsToProducer(shardToIdentifiers); - idToShard = ImmutableMap.copyOf(shardMap); - } - private static BiMap mapIdsToProducer( final Multimap shardToId) { final Builder idToProducerBuilder = ImmutableBiMap.builder(); @@ -122,36 +116,62 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { return idToProducerBuilder.build(); } - @Override - public synchronized DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) { - Preconditions.checkState(!closed, "Producer is already closed"); + private void checkNotClosed() { + Preconditions.checkState(closed == 0, "Producer is already closed"); + } + + private void checkIdle() { Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx); + } + + void subshardAdded(final Map shardMap) { + checkIdle(); + + final Multimap shardToIdentifiers = ArrayListMultimap.create(); + // map which identifier belongs to which shard + for (final Entry entry : shardMap.entrySet()) { + shardToIdentifiers.put(entry.getValue(), entry.getKey()); + } + this.idToProducer = mapIdsToProducer(shardToIdentifiers); + idToShard = ImmutableMap.copyOf(shardMap); + } + + @Override + public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) { + checkNotClosed(); + checkIdle(); - LOG.debug("Creating transaction from producer"); - final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null); final ShardedDOMDataTreeWriteTransaction ret; + LOG.debug("Creating transaction from producer {}", this); + + final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null); if (isolated) { // Isolated case. If we have a previous transaction, submit it before returning this one. - if (current != null) { - submitTransaction(current); + synchronized (this) { + if (current != null) { + submitTransaction(current); + } + ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, childRoots); } - ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children, true); } else { // Non-isolated case, see if we can reuse the transaction if (current != null) { LOG.debug("Reusing previous transaction {} since there is still a transaction inflight", - current.getIdentifier()); + current.getIdentifier()); ret = current; } else { - ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children, false); + synchronized (this) { + ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, childRoots); + } } } final boolean success = OPEN_UPDATER.compareAndSet(this, null, ret); - Verify.verify(success); + Preconditions.checkState(success, "Illegal concurrent access to producer %s detected", this); return ret; } + @GuardedBy("this") private void submitTransaction(final ShardedDOMDataTreeWriteTransaction current) { lastTx = current; current.doSubmit(this::transactionSuccessful, this::transactionFailed); @@ -180,9 +200,9 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { } @Override - public synchronized DOMDataTreeProducer createProducer(final Collection subtrees) { - Preconditions.checkState(!closed, "Producer is already closed"); - Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx); + public DOMDataTreeProducer createProducer(final Collection subtrees) { + checkNotClosed(); + checkIdle(); for (final DOMDataTreeIdentifier s : subtrees) { // Check if the subtree was visible at any time @@ -193,22 +213,24 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { // Check if part of the requested subtree is not delegated to a child. for (final DOMDataTreeIdentifier c : children.keySet()) { - if (s.contains(c)) { - throw new IllegalArgumentException(String.format("Subtree %s cannot be delegated as it is" - + " superset of already-delegated %s", s, c)); - } + Preconditions.checkArgument(!s.contains(c), + "Subtree %s cannot be delegated as it is a superset of already-delegated %s", s, c); } } - final DOMDataTreeProducer ret = dataTree.createProducer(this, subtrees); - final ImmutableMap.Builder cb = ImmutableMap.builder(); - cb.putAll(children); - for (final DOMDataTreeIdentifier s : subtrees) { - cb.put(s, ret); - } + synchronized (this) { + final DOMDataTreeProducer ret = dataTree.createProducer(this, subtrees); + final ImmutableMap.Builder cb = ImmutableMap.builder(); + cb.putAll(children); + for (final DOMDataTreeIdentifier s : subtrees) { + cb.put(s, ret); + } - children = cb.build(); - return ret; + children = cb.build(); + childRoots = ImmutableSet.copyOf(Collections2.transform(children.keySet(), + DOMDataTreeIdentifier::getRootIdentifier)); + return ret; + } } boolean isDelegatedToChild(final DOMDataTreeIdentifier path) { @@ -222,14 +244,15 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { @Override - public synchronized void close() throws DOMDataTreeProducerException { - if (!closed) { - if (openTx != null) { - throw new DOMDataTreeProducerBusyException(String.format("Transaction %s is still open", openTx)); - } + public void close() throws DOMDataTreeProducerException { + if (openTx != null) { + throw new DOMDataTreeProducerBusyException(String.format("Transaction %s is still open", openTx)); + } - closed = true; - dataTree.destroyProducer(this); + if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) { + synchronized (this) { + dataTree.destroyProducer(this); + } } } @@ -246,47 +269,61 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { } } - void processTransaction(final ShardedDOMDataTreeWriteTransaction transaction) { + // Called when the user submits a transaction + void transactionSubmitted(final ShardedDOMDataTreeWriteTransaction transaction) { final boolean wasOpen = OPEN_UPDATER.compareAndSet(this, transaction, null); - Verify.verify(wasOpen); - - if (lastTx != null) { - final boolean success = CURRENT_UPDATER.compareAndSet(this, null, transaction); - Verify.verify(success); - if (lastTx == null) { - // Dispatch after requeue - processCurrentTransaction(); + Preconditions.checkState(wasOpen, "Attempted to submit non-open transaction %s", transaction); + + if (lastTx == null) { + // No transaction outstanding, we need to submit it now + synchronized (this) { + submitTransaction(transaction); + } + + return; + } + + // There is a potentially-racing submitted transaction. Publish the new one, which may end up being + // picked up by processNextTransaction. + final boolean success = CURRENT_UPDATER.compareAndSet(this, null, transaction); + Verify.verify(success); + + // Now a quick check: if the racing transaction completed in between, it may have missed the current + // transaction, hence we need to re-check + if (lastTx == null) { + submitCurrentTransaction(); + } + } + + private void submitCurrentTransaction() { + final ShardedDOMDataTreeWriteTransaction current = currentTx; + if (current != null) { + synchronized (this) { + if (CURRENT_UPDATER.compareAndSet(this, current, null)) { + submitTransaction(current); + } } - } else { - submitTransaction(transaction); } } - void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx, final Void result) { + private void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx) { LOG.debug("Transaction {} completed successfully", tx.getIdentifier()); - tx.onTransactionSuccess(result); - processNextTransaction(tx); + tx.onTransactionSuccess(null); + transactionCompleted(tx); } - void transactionFailed(final ShardedDOMDataTreeWriteTransaction tx, final Throwable throwable) { + private void transactionFailed(final ShardedDOMDataTreeWriteTransaction tx, final Throwable throwable) { LOG.debug("Transaction {} failed", tx.getIdentifier(), throwable); tx.onTransactionFailure(throwable); - processNextTransaction(tx); - } - - private void processCurrentTransaction() { - final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null); - if (current != null) { - submitTransaction(current); - } + transactionCompleted(tx); } - private void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) { + private void transactionCompleted(final ShardedDOMDataTreeWriteTransaction tx) { final boolean wasLast = LAST_UPDATER.compareAndSet(this, tx, null); if (wasLast) { - processCurrentTransaction(); + submitCurrentTransaction(); } }