package org.opendaylight.mdsal.dom.broker;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
class ShardedDOMDataTreeProducer implements DOMDataTreeProducer {
private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeProducer.class);
+
private final Set<DOMDataTreeIdentifier> subtrees;
private final ShardedDOMDataTree dataTree;
private BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducer = ImmutableBiMap.of();
private Map<DOMDataTreeIdentifier, DOMDataTreeShard> idToShard;
- @GuardedBy("this")
- private DOMDataTreeCursorAwareTransaction openTx;
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ CURRENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "currentTx");
+ @SuppressWarnings("unused")
+ private volatile ShardedDOMDataTreeWriteTransaction currentTx;
+
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ OPEN_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "openTx");
+ private volatile ShardedDOMDataTreeWriteTransaction openTx;
+
+ private static final AtomicReferenceFieldUpdater<ShardedDOMDataTreeProducer, ShardedDOMDataTreeWriteTransaction>
+ LAST_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class,
+ ShardedDOMDataTreeWriteTransaction.class, "lastTx");
+ private volatile ShardedDOMDataTreeWriteTransaction lastTx;
+
@GuardedBy("this")
private Map<DOMDataTreeIdentifier, DOMDataTreeProducer> children = Collections.emptyMap();
@GuardedBy("this")
idToShard = ImmutableMap.copyOf(shardMap);
}
- private BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> mapIdsToProducer(final Multimap<DOMDataTreeShard,
- DOMDataTreeIdentifier> shardToId) {
+ private static BiMap<DOMDataTreeIdentifier, DOMDataTreeShardProducer> mapIdsToProducer(
+ final Multimap<DOMDataTreeShard, DOMDataTreeIdentifier> shardToId) {
final Builder<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducerBuilder = ImmutableBiMap.builder();
for (final Entry<DOMDataTreeShard, Collection<DOMDataTreeIdentifier>> entry : shardToId.asMap().entrySet()) {
if (entry.getKey() instanceof WriteableDOMDataTreeShard) {
Preconditions.checkState(!closed, "Producer is already closed");
Preconditions.checkState(openTx == null, "Transaction %s is still open", openTx);
- this.openTx = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children);
+ LOG.debug("Creating transaction from producer");
+ final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null);
+ final ShardedDOMDataTreeWriteTransaction ret;
+ if (isolated) {
+ // Isolated case. If we have a previous transaction, submit it before returning this one.
+ if (current != null) {
+ submitTransaction(current);
+ }
+ ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children, true);
+ } else {
+ // Non-isolated case, see if we can reuse the transaction
+ if (current != null) {
+ LOG.debug("Reusing previous transaction {} since there is still a transaction inflight",
+ current.getIdentifier());
+ ret = current;
+ } else {
+ ret = new ShardedDOMDataTreeWriteTransaction(this, idToProducer, children, false);
+ }
+ }
+
+ final boolean success = OPEN_UPDATER.compareAndSet(this, null, ret);
+ Verify.verify(success);
+ return ret;
+ }
- return openTx;
+ private void submitTransaction(final ShardedDOMDataTreeWriteTransaction current) {
+ lastTx = current;
+ current.doSubmit(this::transactionSuccessful, this::transactionFailed);
}
@GuardedBy("this")
return subtrees;
}
- synchronized void cancelTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
- if (!openTx.equals(transaction)) {
+ void cancelTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
+ final boolean success = OPEN_UPDATER.compareAndSet(this, transaction, null);
+ if (success) {
+ LOG.debug("Transaction {} cancelled", transaction);
+ } else {
LOG.warn("Transaction {} is not open in producer {}", transaction, this);
- return;
}
+ }
+
+ void processTransaction(final ShardedDOMDataTreeWriteTransaction transaction) {
+ final boolean wasOpen = OPEN_UPDATER.compareAndSet(this, transaction, null);
+ Verify.verify(wasOpen);
+
+ if (lastTx != null) {
+ final boolean success = CURRENT_UPDATER.compareAndSet(this, null, transaction);
+ Verify.verify(success);
+ if (lastTx == null) {
+ // Dispatch after requeue
+ processCurrentTransaction();
+ }
+ } else {
+ submitTransaction(transaction);
+ }
+ }
+
+ void transactionSuccessful(final ShardedDOMDataTreeWriteTransaction tx, final Void result) {
+ LOG.debug("Transaction {} completed successfully", tx.getIdentifier());
- LOG.debug("Transaction {} cancelled", transaction);
- openTx = null;
+ tx.onTransactionSuccess(result);
+ processNextTransaction(tx);
}
- synchronized void transactionSubmitted(final ShardedDOMDataTreeWriteTransaction transaction) {
- Preconditions.checkState(openTx.equals(transaction));
- openTx = null;
+ void transactionFailed(final ShardedDOMDataTreeWriteTransaction tx, final Throwable throwable) {
+ LOG.debug("Transaction {} failed", tx.getIdentifier(), throwable);
+
+ tx.onTransactionFailure(throwable);
+ processNextTransaction(tx);
+ }
+
+ private void processCurrentTransaction() {
+ final ShardedDOMDataTreeWriteTransaction current = CURRENT_UPDATER.getAndSet(this, null);
+ if (current != null) {
+ submitTransaction(current);
+ }
+ }
+
+ private void processNextTransaction(final ShardedDOMDataTreeWriteTransaction tx) {
+ final boolean wasLast = LAST_UPDATER.compareAndSet(this, tx, null);
+ if (wasLast) {
+ processCurrentTransaction();
+ }
}
synchronized void boundToListener(final ShardedDOMDataTreeListenerContext<?> listener) {
- // FIXME: Add option to dettach
- Preconditions.checkState(this.attachedListener == null,
- "Producer %s is already attached to other listener.",
+ // FIXME: Add option to detach
+ Preconditions.checkState(this.attachedListener == null, "Producer %s is already attached to other listener.",
listener.getListener());
this.attachedListener = listener;
}
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
@GuardedBy("this")
private DOMDataTreeWriteCursor openCursor;
+ private final SettableFuture<Void> future = SettableFuture.create();
+ private final CheckedFuture<Void, TransactionCommitFailedException> submitFuture =
+ Futures.makeChecked(future, TransactionCommitFailedExceptionMapper.create("submit"));
+
+ private final boolean isolated;
+
ShardedDOMDataTreeWriteTransaction(final ShardedDOMDataTreeProducer producer,
final Map<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducer,
- final Map<DOMDataTreeIdentifier, DOMDataTreeProducer> childProducers) {
+ final Map<DOMDataTreeIdentifier, DOMDataTreeProducer> childProducers,
+ final boolean isolated) {
+ this.isolated = isolated;
this.producer = Preconditions.checkNotNull(producer);
idToTransaction = new HashMap<>();
Preconditions.checkNotNull(idToProducer).forEach((id, prod) -> idToTransaction.put(
id, prod.createTransaction()));
this.identifier = "SHARDED-DOM-" + COUNTER.getAndIncrement();
+ LOG.debug("Created new transaction{}", identifier);
childProducers.forEach((id, prod) -> childBoundaries.add(id.getRootIdentifier()));
}
Preconditions.checkState(!closed, "Transaction %s is already closed", identifier);
Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open");
+ producer.processTransaction(this);
+ return submitFuture;
+ }
+
+ CheckedFuture<Void, TransactionCommitFailedException> doSubmit(
+ BiConsumer<ShardedDOMDataTreeWriteTransaction, Void> success,
+ BiConsumer<ShardedDOMDataTreeWriteTransaction, Throwable> failure) {
+
final Set<DOMDataTreeShardWriteTransaction> txns = ImmutableSet.copyOf(idToTransaction.values());
final ListenableFuture<List<Void>> listListenableFuture =
Futures.allAsList(txns.stream().map(tx -> {
+ LOG.debug("Readying tx {}", identifier);
tx.ready();
return tx.submit();
}).collect(Collectors.toList()));
@Override
public void onSuccess(final List<Void> result) {
ret.set(null);
+ success.accept(ShardedDOMDataTreeWriteTransaction.this, null);
}
@Override
public void onFailure(final Throwable exp) {
ret.setException(exp);
+ failure.accept(ShardedDOMDataTreeWriteTransaction.this, exp);
}
});
- producer.transactionSubmitted(this);
return Futures.makeChecked(ret, TransactionCommitFailedExceptionMapper.create("submit"));
}
+ void onTransactionSuccess(final Void result) {
+ future.set(result);
+ }
+
+ void onTransactionFailure(final Throwable throwable) {
+ future.setException(throwable);
+ }
+
synchronized void cursorClosed() {
openCursor = null;
}
+ boolean isIsolated() {
+ return isolated;
+ }
+
private class DelegatingCursor implements DOMDataTreeWriteCursor {
private final DOMDataTreeWriteCursor delegate;
+ private final DOMDataTreeIdentifier rootPosition;
private final Deque<PathArgument> path = new LinkedList<>();
DelegatingCursor(final DOMDataTreeWriteCursor delegate, final DOMDataTreeIdentifier rootPosition) {
- this.delegate = delegate;
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.rootPosition = Preconditions.checkNotNull(rootPosition);
path.addAll(rootPosition.getRootIdentifier().getPathArguments());
}
@Override
public void close() {
+ int depthEntered = path.size() - rootPosition.getRootIdentifier().getPathArguments().size();
+ if (depthEntered > 0) {
+ // clean up existing modification cursor in case this tx will be reused for batching
+ delegate.exit(depthEntered);
+ }
+
delegate.close();
cursorClosed();
}
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
import org.opendaylight.mdsal.dom.broker.util.TestModel;
import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.parser.spi.meta.ReactorException;
import org.slf4j.Logger;
verifyNoMoreInteractions(mockedDataTreeListener);
}
+ @Test
+ public void testMultipleWritesIntoSingleMapEntry() throws Exception {
+
+ final YangInstanceIdentifier oid1 = TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates(
+ TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), 0));
+ final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1);
+
+ final DOMDataTreeProducer shardProducer = dataTreeService.createProducer(
+ Collections.singletonList(outerListPath));
+ final InMemoryDOMDataTreeShard outerListShard = InMemoryDOMDataTreeShard.create(outerListPath, executor, 1000);
+ outerListShard.onGlobalContextUpdated(schemaContext);
+
+ final ListenerRegistration<InMemoryDOMDataTreeShard> oid1ShardRegistration =
+ dataTreeService.registerDataTreeShard(outerListPath, outerListShard, shardProducer);
+
+ final DOMDataTreeCursorAwareTransaction tx = shardProducer.createTransaction(false);
+ final DOMDataTreeWriteCursor cursor =
+ tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1));
+ assertNotNull(cursor);
+
+ MapNode innerList = ImmutableMapNodeBuilder
+ .create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
+ .build();
+
+ cursor.write(new NodeIdentifier(TestModel.INNER_LIST_QNAME), innerList);
+ cursor.close();
+ tx.submit().checkedGet();
+
+ final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
+ final Collection<MapEntryNode> innerListMapEntries = createInnerListMapEntries(1000, "run-1");
+ for (final MapEntryNode innerListMapEntry : innerListMapEntries) {
+ final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false);
+ final DOMDataTreeWriteCursor cursor1 = tx1.createCursor(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+ oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME))));
+ cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry);
+ cursor1.close();
+ futures.add(tx1.submit());
+ }
+
+ futures.get(futures.size() - 1).checkedGet();
+
+ }
+
+ private Collection<MapEntryNode> createInnerListMapEntries(int amount, String valuePrefix) {
+ final Collection<MapEntryNode> ret = new ArrayList<>();
+ for (int i = 0; i < amount; i++) {
+ ret.add(ImmutableNodes.mapEntryBuilder()
+ .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME,
+ QName.create(TestModel.OUTER_LIST_QNAME, "name"), Integer.toString(i)))
+ .withChild(ImmutableNodes
+ .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
+ .withChild(ImmutableNodes
+ .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
+ .build());
+ }
+
+ return ret;
+ }
+
@Test
public void testMultipleProducerCursorCreation() throws Exception {
@Override
public InmemoryDOMDataTreeShardWriteTransaction createTransaction() {
- Preconditions.checkState(currentTx == null || currentTx.isFinished(), "Previous transaction not finished yet.");
+// Preconditions.checkState(currentTx == null || currentTx.isFinished(), "Previous transaction not finished yet.");
if (lastSubmittedTx != null) {
currentTx = parentShard.createTransaction(lastSubmittedTx);
} else {
void cursorClosed() {
Preconditions.checkNotNull(cursor);
+ modification.closeCursor();
cursor = null;
}
return submit;
}
- public void followUp() {
-
- }
-
@Override
public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
Preconditions.checkState(!finished, "Transaction is finished/closed already.");
}
}
+ void closeCursor() {
+ rootContext.closeCursor();
+ }
}
\ No newline at end of file
return ret;
}
+
+ void closeCursor() {
+ cursor.close();
+ cursor = null;
+ }
}
final DataTreeModificationCursor dataTreeModificationCursor = mock(DataTreeModificationCursor.class);
doReturn(DataTreeModificationCursorAdaptor.of( dataTreeModificationCursor))
.when(SHARD_ROOT_MODIFICATION_CONTEXT).cursor();
+ doNothing().when(SHARD_ROOT_MODIFICATION_CONTEXT).closeCursor();
final DataTreeCandidate dataTreeCandidate = mock(DataTreeCandidate.class);
final DataTreeCandidateNode dataTreeCandidateNode = mock(DataTreeCandidateNode.class);
doReturn(dataTreeCandidateNode).when(dataTreeCandidate).getRootNode();