X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FWriteTransactionsHandler.java;h=a026e6f2f3b00f8c536d099616acbff0962df356;hp=fd47b7176b2411b2bb629d65c56c5a1bac7474af;hb=640c1a8a610811c7d9b7d744d39fd37197fa3b63;hpb=bb61cf2bfc27e04d157f08fac3198fda532cebd6 diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java index fd47b7176b..a026e6f2f3 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.clustering.it.provider.impl; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -19,13 +20,15 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.SplittableRandom; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; @@ -40,10 +43,13 @@ import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,54 +61,55 @@ public class WriteTransactionsHandler implements Runnable { private static final int MAX_ITEM = 1048576; private static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); + private static final QName ID_INT = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); private static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); private static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern(); public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); + public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); - private final DOMDataBroker domDataBroker; - private final Long timeToTake; - private final Long delay; - private final String id; private final WriteTransactionsInput input; private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final ArrayList> futures = new ArrayList<>(); + private final List> futures = new ArrayList<>(); private final Set usedValues = new HashSet<>(); private RandomnessProvider random; private TxProvider txProvider; - private long startTime; + private final DOMDataBroker domDataBroker; + private final Long runtimeNanos; + private final Long delayNanos; + private final String id; + private SettableFuture> completionFuture; + private Stopwatch stopwatch; private long allTx = 0; private long insertTx = 0; private long deleteTx = 0; private ScheduledFuture scheduledFuture; - private YangInstanceIdentifier idListWithKey; + private YangInstanceIdentifier idListItem; public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) { this.domDataBroker = domDataBroker; this.input = input; - timeToTake = input.getSeconds() * SECOND_AS_NANO; - delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); + runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds()); + delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond(); id = input.getId(); } @Override public void run() { - final long current = System.nanoTime(); - - futures.add(execWrite()); - - maybeFinish(current); + futures.add(execWrite(futures.size())); + maybeFinish(); } public void start(final SettableFuture> settableFuture) { @@ -117,9 +124,9 @@ public class WriteTransactionsHandler implements Runnable { } if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) { - startTime = System.nanoTime(); + stopwatch = Stopwatch.createStarted(); completionFuture = settableFuture; - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); + scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS); } else { executor.shutdown(); } @@ -127,60 +134,71 @@ public class WriteTransactionsHandler implements Runnable { private boolean ensureListExists(final SettableFuture> settableFuture) { - final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id) + final ContainerNode containerNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(ID_INTS)) + .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build()) + .build(); + + DOMDataWriteTransaction tx = txProvider.createTransaction(); + // write only the top list + tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode); + try { + tx.submit().checkedGet(125, TimeUnit.SECONDS); + } catch (final OptimisticLockFailedException e) { + // when multiple write-transactions are executed concurrently we need to ignore this. + // If we get optimistic lock here it means id-ints already exists and we can continue. + LOG.debug("Got an optimistic lock when writing initial top level list element.", e); + } catch (final TransactionCommitFailedException | TimeoutException e) { + LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); + settableFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + return false; + } + + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) .build(); - final MapNode mapNode = - ImmutableNodes.mapNodeBuilder(ID_INTS) - .withChild(entry) - .build(); - final DOMDataWriteTransaction tx = txProvider.createTransaction(); - idListWithKey = ID_INTS_YID.node(entry.getIdentifier()); - tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, mapNode); + idListItem = ID_INT_YID.node(entry.getIdentifier()); + tx = txProvider.createTransaction(); + tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry); try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { + tx.submit().checkedGet(125, TimeUnit.SECONDS); + return true; + } catch (final Exception e) { LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); settableFuture.set(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); return false; } - - return true; } private boolean fillInitialList(final SettableFuture> settableFuture) { LOG.debug("Filling the item list with initial values."); final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); - for (int i = 0; i < MAX_ITEM / 2; i++) { - usedValues.add(i); - mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i)); - } - final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM); + final YangInstanceIdentifier itemListId = idListItem.node(ITEM); final DOMDataWriteTransaction tx = txProvider.createTransaction(); tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build()); try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { + tx.submit().checkedGet(125, TimeUnit.SECONDS); + return true; + } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); settableFuture.set(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); return false; } - - return true; } - private CheckedFuture execWrite() { + private ListenableFuture execWrite(final int offset) { final int i = random.nextInt(MAX_ITEM + 1); final YangInstanceIdentifier entryId = - idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); + idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); final DOMDataWriteTransaction tx = txProvider.createTransaction(); allTx++; @@ -199,42 +217,80 @@ public class WriteTransactionsHandler implements Runnable { usedValues.add(i); } - return tx.submit(); + final ListenableFuture future = tx.submit(); + if (LOG.isDebugEnabled()) { + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + LOG.debug("Future #{} completed successfully", offset); + } + + @Override + public void onFailure(final Throwable cause) { + LOG.debug("Future #{} failed", offset, cause); + } + }); + } + + return future; } - private void maybeFinish(final long current) { - if ((current - startTime) > timeToTake) { + private void maybeFinish() { + final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + if (elapsed >= runtimeNanos) { LOG.debug("Reached max running time, waiting for futures to complete."); scheduledFuture.cancel(false); final ListenableFuture> allFutures = Futures.allAsList(futures); - Futures.addCallback(allFutures, new FutureCallback>() { - @Override - public void onSuccess(@Nullable final List result) { - LOG.debug("All futures completed successfully."); + try { + // Timeout from cds should be 2 minutes so leave some leeway. + allFutures.get(125, TimeUnit.SECONDS); - final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); + LOG.debug("All futures completed successfully."); - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); + final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); - executor.shutdown(); + completionFuture.set(RpcResultBuilder.success() + .withResult(output).build()); + + executor.shutdown(); + } catch (final ExecutionException e) { + LOG.error("Write transactions failed.", e.getCause()); + + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build()); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Write transactions failed.", e); + + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", e).build()); + + for (int i = 0; i < futures.size(); i++) { + final ListenableFuture future = futures.get(i); + + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException fe) { + LOG.warn("Future #{}/{} not completed yet", i, futures.size()); + } catch (final ExecutionException fe) { + LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause()); + } catch (final InterruptedException fe) { + LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e); + } } + } catch (Exception exception) { + LOG.error("Write transactions failed.", exception); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build()); - @Override - public void onFailure(final Throwable t) { - LOG.error("Write transactions failed.", t); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build()); - - executor.shutdown(); - } - }); + executor.shutdown(); + } } }