From: Robert Varga Date: Thu, 15 Jun 2017 08:39:55 +0000 (+0200) Subject: Cleanup ProduceTransactionsHandler X-Git-Tag: release/nitrogen~72 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=640c1a8a610811c7d9b7d744d39fd37197fa3b63;hp=915867f38e525e6e0735523f28523f5e13ee1cd7 Cleanup ProduceTransactionsHandler Shuffle invariants around to reduce overheads. Also adds better debugs around futures completing. Change-Id: I01f940de08e9e0b7fc0e95b48b2d5fecdfd78f86 Signed-off-by: Robert Varga (cherry picked from commit 9797fc8e587a51395342586bc44de9750fb67af3) --- diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java index cfd59a5251..2a1f5ae47d 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java @@ -8,7 +8,8 @@ package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -25,7 +26,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; @@ -44,7 +44,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,65 +55,61 @@ public class ProduceTransactionsHandler implements Runnable { private static final int MAX_ITEM = 1048576; static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); public static final QName ID_INT = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern()); public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT); - - private final DOMDataTreeService domDataTreeService; - - private final long timeToTake; - private final long delay; - private final String id; + public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final ArrayList> futures = new ArrayList<>(); + private final List> futures = new ArrayList<>(); private final Set usedValues = new HashSet<>(); private final SplittableRandom random = new SplittableRandom(); - private long startTime; + private final DOMDataTreeService domDataTreeService; + private final long runtimeNanos; + private final long delayNanos; + private final String id; + private SettableFuture> completionFuture; + private Stopwatch stopwatch; private long allTx = 0; private long insertTx = 0; private long deleteTx = 0; private ScheduledFuture scheduledFuture; private DOMDataTreeProducer itemProducer; - private YangInstanceIdentifier idListWithKey; + private DOMDataTreeIdentifier idListItem; public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) { this.domDataTreeService = domDataTreeService; - timeToTake = input.getSeconds() * SECOND_AS_NANO; - delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); + runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds()); + delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond(); id = input.getId(); } @Override public void run() { - final long current = System.nanoTime(); - - futures.add(execWrite()); - - maybeFinish(current); + futures.add(execWrite(futures.size())); + maybeFinish(); } public void start(final SettableFuture> settableFuture) { completionFuture = settableFuture; if (fillInitialList(completionFuture)) { - startTime = System.nanoTime(); - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); + stopwatch = Stopwatch.createStarted(); + scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS); } else { executor.shutdown(); } @@ -123,8 +118,7 @@ public class ProduceTransactionsHandler implements Runnable { private boolean fillInitialList(final SettableFuture> settableFuture) { LOG.debug("Filling the item list with initial values."); - final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); - idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id)); + final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id)); itemProducer = domDataTreeService.createProducer( Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); @@ -133,54 +127,77 @@ public class ProduceTransactionsHandler implements Runnable { final DOMDataTreeWriteCursor cursor = tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)); - final MapNode list = mapBuilder.build(); + final MapNode list = ImmutableNodes.mapNodeBuilder(ITEM).build(); cursor.write(list.getIdentifier(), list); cursor.close(); + idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + idListWithKey.node(list.getIdentifier()).toOptimized()); + try { tx.submit().checkedGet(125, TimeUnit.SECONDS); + return true; } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); settableFuture.set(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; } - return true; + try { + itemProducer.close(); + } catch (final DOMDataTreeProducerException exception) { + LOG.warn("Failure while closing producer.", exception); + } + return false; } - private CheckedFuture execWrite() { + private ListenableFuture execWrite(final int offset) { final int i = random.nextInt(MAX_ITEM + 1); - - final YangInstanceIdentifier entryId = - idListWithKey.node(ITEM).node(new NodeIdentifierWithPredicates(ITEM, NUMBER, i)); - final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor = tx.createCursor( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM))); + final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem); + allTx++; + final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); deleteTx++; - cursor.delete(entryId.getLastPathArgument()); + cursor.delete(entryId); usedValues.remove(i); } else { LOG.debug("Inserting item: {}", i); insertTx++; - final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); - cursor.write(entryId.getLastPathArgument(), entry); + + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder().withNodeIdentifier(entryId) + .withChild(ImmutableNodes.leafNode(NUMBER, i)).build(); + cursor.write(entryId, entry); usedValues.add(i); } cursor.close(); - return tx.submit(); + final ListenableFuture future = tx.submit(); + if (LOG.isDebugEnabled()) { + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + LOG.debug("Future #{} completed successfully", offset); + } + + @Override + public void onFailure(final Throwable cause) { + LOG.debug("Future #{} failed", offset, cause); + } + }); + } + + return future; } - private void maybeFinish(final long current) { - if ((current - startTime) > timeToTake) { + private void maybeFinish() { + final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + if (elapsed >= runtimeNanos) { LOG.debug("Reached max running time, waiting for futures to complete."); scheduledFuture.cancel(false); @@ -197,31 +214,42 @@ public class ProduceTransactionsHandler implements Runnable { .setInsertTx(insertTx) .setDeleteTx(deleteTx) .build(); - - completionFuture.set(RpcResultBuilder.success() .withResult(output).build()); - - executor.shutdown(); - } catch (Exception exception) { - LOG.error("Write transactions failed.", exception); + } catch (ExecutionException e) { + LOG.error("Write transactions failed.", e.getCause()); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build()); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Write transactions failed.", e); completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build()); + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", e).build()); for (int i = 0; i < futures.size(); i++) { - final CheckedFuture future = futures.get(i); - if (!future.isDone()) { - LOG.warn("Future #{}/{} possibly hanged.", future, futures.size()); + final ListenableFuture future = futures.get(i); + + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (TimeoutException fe) { + LOG.warn("Future #{}/{} not completed yet", i, futures.size()); + } catch (ExecutionException fe) { + LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause()); + } catch (InterruptedException fe) { + LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e); } } + } catch (Exception e) { + LOG.error("Write transactions failed.", e); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + } - executor.shutdown(); - } finally { - try { - itemProducer.close(); - } catch (final DOMDataTreeProducerException e) { - LOG.warn("Failure while closing item producer.", e); - } + executor.shutdown(); + try { + itemProducer.close(); + } catch (final DOMDataTreeProducerException e) { + LOG.warn("Failure while closing item producer.", e); } } } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java index 6ecb8c8381..a026e6f2f3 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -8,7 +8,9 @@ package org.opendaylight.controller.clustering.it.provider.impl; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -59,57 +61,55 @@ public class WriteTransactionsHandler implements Runnable { private static final int MAX_ITEM = 1048576; private static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); private static final QName ID_INT = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); private static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); private static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern(); public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT); + public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); - private final DOMDataBroker domDataBroker; - private final Long timeToTake; - private final Long delay; - private final String id; private final WriteTransactionsInput input; private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final ArrayList> futures = new ArrayList<>(); + private final List> futures = new ArrayList<>(); private final Set usedValues = new HashSet<>(); private RandomnessProvider random; private TxProvider txProvider; - private long startTime; + private final DOMDataBroker domDataBroker; + private final Long runtimeNanos; + private final Long delayNanos; + private final String id; + private SettableFuture> completionFuture; + private Stopwatch stopwatch; private long allTx = 0; private long insertTx = 0; private long deleteTx = 0; private ScheduledFuture scheduledFuture; - private YangInstanceIdentifier idListWithKey; + private YangInstanceIdentifier idListItem; public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) { this.domDataBroker = domDataBroker; this.input = input; - timeToTake = input.getSeconds() * SECOND_AS_NANO; - delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); + runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds()); + delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond(); id = input.getId(); } @Override public void run() { - final long current = System.nanoTime(); - - futures.add(execWrite()); - - maybeFinish(current); + futures.add(execWrite(futures.size())); + maybeFinish(); } public void start(final SettableFuture> settableFuture) { @@ -124,9 +124,9 @@ public class WriteTransactionsHandler implements Runnable { } if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) { - startTime = System.nanoTime(); + stopwatch = Stopwatch.createStarted(); completionFuture = settableFuture; - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); + scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS); } else { executor.shutdown(); } @@ -159,20 +159,19 @@ public class WriteTransactionsHandler implements Runnable { .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) .build(); - idListWithKey = ID_INT_YID.node(entry.getIdentifier()); + idListItem = ID_INT_YID.node(entry.getIdentifier()); tx = txProvider.createTransaction(); - tx.merge(LogicalDatastoreType.CONFIGURATION, idListWithKey, entry); + tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry); try { tx.submit().checkedGet(125, TimeUnit.SECONDS); + return true; } catch (final Exception e) { LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); settableFuture.set(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); return false; } - - return true; } private boolean fillInitialList(final SettableFuture> settableFuture) { @@ -180,27 +179,26 @@ public class WriteTransactionsHandler implements Runnable { final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); - final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM); + final YangInstanceIdentifier itemListId = idListItem.node(ITEM); final DOMDataWriteTransaction tx = txProvider.createTransaction(); tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build()); try { tx.submit().checkedGet(125, TimeUnit.SECONDS); + return true; } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); settableFuture.set(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); return false; } - - return true; } - private CheckedFuture execWrite() { + private ListenableFuture execWrite(final int offset) { final int i = random.nextInt(MAX_ITEM + 1); final YangInstanceIdentifier entryId = - idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); + idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); final DOMDataWriteTransaction tx = txProvider.createTransaction(); allTx++; @@ -219,11 +217,27 @@ public class WriteTransactionsHandler implements Runnable { usedValues.add(i); } - return tx.submit(); + final ListenableFuture future = tx.submit(); + if (LOG.isDebugEnabled()) { + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + LOG.debug("Future #{} completed successfully", offset); + } + + @Override + public void onFailure(final Throwable cause) { + LOG.debug("Future #{} failed", offset, cause); + } + }); + } + + return future; } - private void maybeFinish(final long current) { - if ((current - startTime) > timeToTake) { + private void maybeFinish() { + final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + if (elapsed >= runtimeNanos) { LOG.debug("Reached max running time, waiting for futures to complete."); scheduledFuture.cancel(false); @@ -245,17 +259,35 @@ public class WriteTransactionsHandler implements Runnable { .withResult(output).build()); executor.shutdown(); - } catch (Exception exception) { - LOG.error("Write transactions failed.", exception); + } catch (final ExecutionException e) { + LOG.error("Write transactions failed.", e.getCause()); + completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build()); + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build()); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Write transactions failed.", e); + + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", e).build()); for (int i = 0; i < futures.size(); i++) { - final CheckedFuture future = futures.get(i); - if (!future.isDone()) { - LOG.warn("Future #{}/{} possibly hanged.", future, futures.size()); + final ListenableFuture future = futures.get(i); + + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException fe) { + LOG.warn("Future #{}/{} not completed yet", i, futures.size()); + } catch (final ExecutionException fe) { + LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause()); + } catch (final InterruptedException fe) { + LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e); } } + } catch (Exception exception) { + LOG.error("Write transactions failed.", exception); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build()); executor.shutdown(); }