package org.opendaylight.mdsal.dom.broker;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
class ShardedDOMDataTreeProducer implements DOMDataTreeProducer {
private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeProducer.class);
+
private final Set<DOMDataTreeIdentifier> subtrees;
private final ShardedDOMDataTree dataTree;
private BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducer = ImmutableBiMap.of();
private Map<DOMDataTreeIdentifier, DOMDataTreeShard> idToShard;
- @GuardedBy("this")
- private DOMDataTreeCursorAwareTransaction openTx;
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ CURRENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "currentTx");
+ @SuppressWarnings("unused")
+ private volatile ShardedDOMDataTreeWriteTransaction currentTx;
+
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ OPEN_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "openTx");
+ private volatile ShardedDOMDataTreeWriteTransaction openTx;
+
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ LAST_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "lastTx");
+ private volatile ShardedDOMDataTreeWriteTransaction lastTx;
+
@GuardedBy("this")
private Map<DOMDataTreeIdentifier, DOMDataTreeProducer> children = Collections.emptyMap();
@GuardedBy("this")
idToShard = ImmutableMap.copyOf(shardMap);
}
- private BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> mapIdsToProducer(final Multimap<DOMDataTreeShard,
- DOMDataTreeIdentifier> shardToId) {
+ private static BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> mapIdsToProducer(
+ final Multimap<DOMDataTreeShard, DOMDataTreeIdentifier> shardToId) {
final Builder<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducerBuilder = ImmutableBiMap.builder();
for (final Entry<DOMDataTreeShard, Collection<DOMDataTreeIdentifier>> entry : shardToId.asMap().entrySet()) {
if (entry.getKey() instanceof WriteableDOMDataTreeShard) {
Preconditions.checkState(!closed, "Producer is already closed");
Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx);
- this.openTx = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children);
+ LOG.debug("Creating transaction from producer");
+ final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null);
+ final ShardedDOMDataTreeWriteTransaction ret;
+ if (isolated) {
+ // Isolated case. If we have a previous transaction, submit it before returning this one.
+ if (current != null) {
+ submitTransaction(current);
+ }
+ ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children, true);
+ } else {
+ // Non-isolated case, see if we can reuse the transaction
+ if (current != null) {
+ LOG.debug("Reusing previous transaction {} since there is still a transaction inflight",
+ current.getIdentifier());
+ ret = current;
+ } else {
+ ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children, false);
+ }
+ }
+
+ final boolean success = OPEN_UPDATER.compareAndSet(this, null, ret);
+ Verify.verify(success);
+ return ret;
+ }
- return openTx;
+ private void submitTransaction(final ShardedDOMDataTreeWriteTransaction current) {
+ lastTx = current;
+ current.doSubmit(this::transactionSuccessful, this::transactionFailed);
}
@GuardedBy("this")
return subtrees;
}
- synchronized void cancelTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
- if (!openTx.equals(transaction)) {
+ void cancelTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
+ final boolean success = OPEN_UPDATER.compareAndSet(this, transaction, null);
+ if (success) {
+ LOG.debug("Transaction {} cancelled", transaction);
+ } else {
LOG.warn("Transaction {} is not open in producer {}", transaction, this);
- return;
}
+ }
+
+ void processTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
+ final boolean wasOpen = OPEN_UPDATER.compareAndSet(this, transaction, null);
+ Verify.verify(wasOpen);
+
+ if (lastTx != null) {
+ final boolean success = CURRENT_UPDATER.compareAndSet(this, null, transaction);
+ Verify.verify(success);
+ if (lastTx == null) {
+ // Dispatch after requeue
+ processCurrentTransaction();
+ }
+ } else {
+ submitTransaction(transaction);
+ }
+ }
+
+ void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx, final Void result) {
+ LOG.debug("Transaction {} completed successfully", tx.getIdentifier());
- LOG.debug("Transaction {} cancelled", transaction);
- openTx = null;
+ tx.onTransactionSuccess(result);
+ processNextTransaction(tx);
}
- synchronized void transactionSubmitted(final ShardedDOMDataTreeWriteTransaction transaction) {
- Preconditions.checkState(openTx.equals(transaction));
- openTx = null;
+ void transactionFailed(final ShardedDOMDataTreeWriteTransaction tx, final Throwable throwable) {
+ LOG.debug("Transaction {} failed", tx.getIdentifier(), throwable);
+
+ tx.onTransactionFailure(throwable);
+ processNextTransaction(tx);
+ }
+
+ private void processCurrentTransaction() {
+ final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null);
+ if (current != null) {
+ submitTransaction(current);
+ }
+ }
+
+ private void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) {
+ final boolean wasLast = LAST_UPDATER.compareAndSet(this, tx, null);
+ if (wasLast) {
+ processCurrentTransaction();
+ }
}
synchronized void boundToListener(final ShardedDOMDataTreeListenerContext<?> listener) {
- // FIXME: Add option to dettach
- Preconditions.checkState(this.attachedListener == null,
- "Producer %s is already attached to other listener.",
+ // FIXME: Add option to detach
+ Preconditions.checkState(this.attachedListener == null, "Producer %s is already attached to other listener.",
listener.getListener());
this.attachedListener = listener;
}