package org.opendaylight.mdsal.dom.broker;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BiMap;
+import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableBiMap.Builder;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardProducer;
import org.opendaylight.mdsal.dom.store.inmemory.WriteableDOMDataTreeShard;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ShardedDOMDataTreeProducer implements DOMDataTreeProducer {
private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeProducer.class);
+
private final Set<DOMDataTreeIdentifier> subtrees;
private final ShardedDOMDataTree dataTree;
private BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducer = ImmutableBiMap.of();
private Map<DOMDataTreeIdentifier, DOMDataTreeShard> idToShard;
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ CURRENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "currentTx");
+ @SuppressWarnings("unused")
+ private volatile ShardedDOMDataTreeWriteTransaction currentTx;
+
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ OPEN_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "openTx");
+ private volatile ShardedDOMDataTreeWriteTransaction openTx;
+
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ LAST_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "lastTx");
+ private volatile ShardedDOMDataTreeWriteTransaction lastTx;
+
@GuardedBy("this")
- private DOMDataTreeCursorAwareTransaction openTx;
+ private Map<DOMDataTreeIdentifier, DOMDataTreeProducer> children = ImmutableMap.of();
@GuardedBy("this")
- private Map<DOMDataTreeIdentifier, DOMDataTreeProducer> children = Collections.emptyMap();
+ private Set<YangInstanceIdentifier> childRoots = ImmutableSet.of();
@GuardedBy("this")
private boolean closed;
idToShard = ImmutableMap.copyOf(shardMap);
}
- private BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> mapIdsToProducer(final Multimap<DOMDataTreeShard, DOMDataTreeIdentifier> shardToId) {
+ private static BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> mapIdsToProducer(
+ final Multimap<DOMDataTreeShard, DOMDataTreeIdentifier> shardToId) {
final Builder<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducerBuilder = ImmutableBiMap.builder();
for (final Entry<DOMDataTreeShard, Collection<DOMDataTreeIdentifier>> entry : shardToId.asMap().entrySet()) {
if (entry.getKey() instanceof WriteableDOMDataTreeShard) {
//create a single producer for all prefixes in a single shard
- final DOMDataTreeShardProducer producer = ((WriteableDOMDataTreeShard) entry.getKey()).createProducer(entry.getValue());
+ final DOMDataTreeShardProducer producer = ((WriteableDOMDataTreeShard) entry.getKey())
+ .createProducer(entry.getValue());
// id mapped to producers
for (final DOMDataTreeIdentifier id : entry.getValue()) {
idToProducerBuilder.put(id, producer);
Preconditions.checkState(!closed, "Producer is already closed");
Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx);
- this.openTx = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children);
+ LOG.debug("Creating transaction from producer");
+ final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null);
+ final ShardedDOMDataTreeWriteTransaction ret;
+ if (isolated) {
+ // Isolated case. If we have a previous transaction, submit it before returning this one.
+ if (current != null) {
+ submitTransaction(current);
+ }
+ ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, childRoots);
+ } else {
+ // Non-isolated case, see if we can reuse the transaction
+ if (current != null) {
+ LOG.debug("Reusing previous transaction {} since there is still a transaction inflight",
+ current.getIdentifier());
+ ret = current;
+ } else {
+ ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, childRoots);
+ }
+ }
+
+ final boolean success = OPEN_UPDATER.compareAndSet(this, null, ret);
+ Verify.verify(success);
+ return ret;
+ }
- return openTx;
+ private void submitTransaction(final ShardedDOMDataTreeWriteTransaction current) {
+ lastTx = current;
+ current.doSubmit(this::transactionSuccessful, this::transactionFailed);
}
@GuardedBy("this")
}
@GuardedBy("this")
- private DOMDataTreeProducer lookupChild(final DOMDataTreeIdentifier s) {
+ private DOMDataTreeProducer lookupChild(final DOMDataTreeIdentifier domDataTreeIdentifier) {
for (final Entry<DOMDataTreeIdentifier, DOMDataTreeProducer> e : children.entrySet()) {
- if (e.getKey().contains(s)) {
+ if (e.getKey().contains(domDataTreeIdentifier)) {
return e.getValue();
}
}
// Check if part of the requested subtree is not delegated to a child.
for (final DOMDataTreeIdentifier c : children.keySet()) {
if (s.contains(c)) {
- throw new IllegalArgumentException(String.format("Subtree %s cannot be delegated as it is superset of already-delegated %s", s, c));
+ throw new IllegalArgumentException(String.format("Subtree %s cannot be delegated as it is"
+ + " superset of already-delegated %s", s, c));
}
}
}
}
children = cb.build();
+ childRoots = ImmutableSet.copyOf(Collections2.transform(children.keySet(),
+ DOMDataTreeIdentifier::getRootIdentifier));
return ret;
}
return subtrees;
}
- synchronized void cancelTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
- if (!openTx.equals(transaction)) {
+ void cancelTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
+ final boolean success = OPEN_UPDATER.compareAndSet(this, transaction, null);
+ if (success) {
+ LOG.debug("Transaction {} cancelled", transaction);
+ } else {
LOG.warn("Transaction {} is not open in producer {}", transaction, this);
- return;
}
+ }
+
+ void processTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
+ final boolean wasOpen = OPEN_UPDATER.compareAndSet(this, transaction, null);
+ Verify.verify(wasOpen);
- LOG.debug("Transaction {} cancelled", transaction);
- openTx = null;
+ if (lastTx != null) {
+ final boolean success = CURRENT_UPDATER.compareAndSet(this, null, transaction);
+ Verify.verify(success);
+ if (lastTx == null) {
+ // Dispatch after requeue
+ processCurrentTransaction();
+ }
+ } else {
+ submitTransaction(transaction);
+ }
+ }
+
+ void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx) {
+ LOG.debug("Transaction {} completed successfully", tx.getIdentifier());
+
+ tx.onTransactionSuccess(null);
+ processNextTransaction(tx);
+ }
+
+ void transactionFailed(final ShardedDOMDataTreeWriteTransaction tx, final Throwable throwable) {
+ LOG.debug("Transaction {} failed", tx.getIdentifier(), throwable);
+
+ tx.onTransactionFailure(throwable);
+ processNextTransaction(tx);
+ }
+
+ private void processCurrentTransaction() {
+ final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null);
+ if (current != null) {
+ submitTransaction(current);
+ }
}
- synchronized void transactionSubmitted(final ShardedDOMDataTreeWriteTransaction transaction) {
- Preconditions.checkState(openTx.equals(transaction));
- openTx = null;
+ private synchronized void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) {
+ final boolean wasLast = LAST_UPDATER.compareAndSet(this, tx, null);
+ if (wasLast) {
+ processCurrentTransaction();
+ }
}
synchronized void boundToListener(final ShardedDOMDataTreeListenerContext<?> listener) {
- // FIXME: Add option to dettach
- Preconditions.checkState(this.attachedListener == null,
- "Producer %s is already attached to other listener.",
+ // FIXME: Add option to detach
+ Preconditions.checkState(this.attachedListener == null, "Producer %s is already attached to other listener.",
listener.getListener());
this.attachedListener = listener;
}