X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FProduceTransactionsHandler.java;h=b1348c2fc42c2fc03f6be77bb9aafe9de864d734;hb=d97061af6814ad7b085af10797a252aa4aa5cda6;hp=2a1f5ae47d24c986d1f9e04844b86f13f4a623f0;hpb=9797fc8e587a51395342586bc44de9750fb67af3;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java index 2a1f5ae47d..b1348c2fc4 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java @@ -8,23 +8,15 @@ package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.FutureCallback; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.SplittableRandom; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; @@ -35,7 +27,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -47,81 +38,34 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ProduceTransactionsHandler implements Runnable { - +public class ProduceTransactionsHandler extends AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class); - private static final int SECOND_AS_NANO = 1000000000; - //2^20 as in the model - private static final int MAX_ITEM = 1048576; - - static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); - public static final QName ID_INT = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); - static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); - static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); - private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern()); - - public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final List> futures = new ArrayList<>(); - private final Set usedValues = new HashSet<>(); + private final SettableFuture> future = SettableFuture.create(); private final SplittableRandom random = new SplittableRandom(); + private final Set usedValues = new HashSet<>(); + private final DOMDataTreeIdentifier idListItem; + private final DOMDataTreeProducer itemProducer; - private final DOMDataTreeService domDataTreeService; - private final long runtimeNanos; - private final long delayNanos; - private final String id; - - private SettableFuture> completionFuture; - private Stopwatch stopwatch; - - private long allTx = 0; private long insertTx = 0; private long deleteTx = 0; - private ScheduledFuture scheduledFuture; - private DOMDataTreeProducer itemProducer; - private DOMDataTreeIdentifier idListItem; - - public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService, - final ProduceTransactionsInput input) { - - this.domDataTreeService = domDataTreeService; - runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds()); - delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond(); - id = input.getId(); - } - - @Override - public void run() { - futures.add(execWrite(futures.size())); - maybeFinish(); + private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem, + final ProduceTransactionsInput input) { + super(input); + this.itemProducer = Preconditions.checkNotNull(producer); + this.idListItem = Preconditions.checkNotNull(idListItem); } - public void start(final SettableFuture> settableFuture) { - completionFuture = settableFuture; - - if (fillInitialList(completionFuture)) { - stopwatch = Stopwatch.createStarted(); - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS); - } else { - executor.shutdown(); - } - } - - private boolean fillInitialList(final SettableFuture> settableFuture) { - LOG.debug("Filling the item list with initial values."); + public static ListenableFuture> start( + final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) { + final String id = input.getId(); + LOG.debug("Filling the item list {} with initial values.", id); final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id)); - itemProducer = domDataTreeService.createProducer( - Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); + final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer( + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); final DOMDataTreeWriteCursor cursor = @@ -131,33 +75,34 @@ public class ProduceTransactionsHandler implements Runnable { cursor.write(list.getIdentifier(), list); cursor.close(); - idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, - idListWithKey.node(list.getIdentifier()).toOptimized()); - try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); - return true; + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - } - try { - itemProducer.close(); - } catch (final DOMDataTreeProducerException exception) { - LOG.warn("Failure while closing producer.", exception); + try { + itemProducer.close(); + } catch (final DOMDataTreeProducerException exception) { + LOG.warn("Failure while closing producer.", exception); + } + + return Futures.immediateFuture(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); } - return false; + + final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer, + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier()) + .toOptimized()), input); + handler.doStart(); + return handler.future; } - private ListenableFuture execWrite(final int offset) { + @Override + ListenableFuture execWrite(final long txId) { final int i = random.nextInt(MAX_ITEM + 1); final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem); - allTx++; - final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); @@ -177,80 +122,30 @@ public class ProduceTransactionsHandler implements Runnable { cursor.close(); - final ListenableFuture future = tx.submit(); - if (LOG.isDebugEnabled()) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - LOG.debug("Future #{} completed successfully", offset); - } - - @Override - public void onFailure(final Throwable cause) { - LOG.debug("Future #{} failed", offset, cause); - } - }); - } - - return future; + return tx.submit(); } - private void maybeFinish() { - final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - if (elapsed >= runtimeNanos) { - LOG.debug("Reached max running time, waiting for futures to complete."); - scheduledFuture.cancel(false); - - final ListenableFuture> allFutures = Futures.allAsList(futures); - - try { - // Timeout from cds should be 2 minutes so leave some leeway. - allFutures.get(125, TimeUnit.SECONDS); - - LOG.debug("All futures completed successfully."); - - final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); - } catch (ExecutionException e) { - LOG.error("Write transactions failed.", e.getCause()); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build()); - } catch (InterruptedException | TimeoutException e) { - LOG.error("Write transactions failed.", e); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, - "Final submit was timed out by the test provider or was interrupted", e).build()); - - for (int i = 0; i < futures.size(); i++) { - final ListenableFuture future = futures.get(i); + @Override + void runFailed(final Throwable cause) { + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build()); + } - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (TimeoutException fe) { - LOG.warn("Future #{}/{} not completed yet", i, futures.size()); - } catch (ExecutionException fe) { - LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause()); - } catch (InterruptedException fe) { - LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e); - } - } - } catch (Exception e) { - LOG.error("Write transactions failed.", e); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - } + @Override + void runSuccessful(final long allTx) { + final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); + future.set(RpcResultBuilder.success() + .withResult(output).build()); + } - executor.shutdown(); - try { - itemProducer.close(); - } catch (final DOMDataTreeProducerException e) { - LOG.warn("Failure while closing item producer.", e); - } - } + @Override + void runTimedOut(final Exception cause) { + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", cause).build()); } }