X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FProduceTransactionsHandler.java;h=b1348c2fc42c2fc03f6be77bb9aafe9de864d734;hb=d97061af6814ad7b085af10797a252aa4aa5cda6;hp=81d783712489388fe7b8f76b6c2a263169efb7ba;hpb=31a52c56cb4e8398403f299d0c3d3830084e260e;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java index 81d7837124..b1348c2fc4 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java @@ -8,24 +8,16 @@ package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.SplittableRandom; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; @@ -35,7 +27,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -44,139 +35,88 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ProduceTransactionsHandler implements Runnable { - +public class ProduceTransactionsHandler extends AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class); - private static final int SECOND_AS_NANO = 1000000000; - //2^20 as in the model - private static final int MAX_ITEM = 1048576; - - static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); - public static final QName ID_INT = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int"); - static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); - static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); - private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); - - public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT); - - private final DOMDataTreeService domDataTreeService; - - private final long timeToTake; - private final long delay; - private final String id; - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final ArrayList> futures = new ArrayList<>(); - private final Set usedValues = new HashSet<>(); - private final SplittableRandom random = new SplittableRandom(); - private long startTime; - private SettableFuture> completionFuture; + private final SettableFuture> future = SettableFuture.create(); + private final SplittableRandom random = new SplittableRandom(); + private final Set usedValues = new HashSet<>(); + private final DOMDataTreeIdentifier idListItem; + private final DOMDataTreeProducer itemProducer; - private long allTx = 0; private long insertTx = 0; private long deleteTx = 0; - private ScheduledFuture scheduledFuture; - private DOMDataTreeProducer itemProducer; - private YangInstanceIdentifier idListWithKey; - - public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService, - final ProduceTransactionsInput input) { - - this.domDataTreeService = domDataTreeService; - - timeToTake = input.getSeconds() * SECOND_AS_NANO; - delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); - id = input.getId(); - } - - @Override - public void run() { - final long current = System.nanoTime(); - - futures.add(execWrite()); - - maybeFinish(current); - } - - public void start(final SettableFuture> settableFuture) { - completionFuture = settableFuture; - if (fillInitialList(completionFuture)) { - startTime = System.nanoTime(); - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); - } else { - executor.shutdown(); - } + private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem, + final ProduceTransactionsInput input) { + super(input); + this.itemProducer = Preconditions.checkNotNull(producer); + this.idListItem = Preconditions.checkNotNull(idListItem); } - private boolean fillInitialList(final SettableFuture> settableFuture) { - LOG.debug("Filling the item list with initial values."); + public static ListenableFuture> start( + final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) { + final String id = input.getId(); + LOG.debug("Filling the item list {} with initial values.", id); - final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); - idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id)); + final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id)); - itemProducer = domDataTreeService.createProducer( - Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); + final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer( + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); final DOMDataTreeWriteCursor cursor = tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)); - final MapNode list = mapBuilder.build(); + final MapNode list = ImmutableNodes.mapNodeBuilder(ITEM).build(); cursor.write(list.getIdentifier(), list); cursor.close(); try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); try { itemProducer.close(); } catch (final DOMDataTreeProducerException exception) { LOG.warn("Failure while closing producer.", exception); } - return false; + + return Futures.immediateFuture(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); } - return true; + final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer, + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier()) + .toOptimized()), input); + handler.doStart(); + return handler.future; } - private CheckedFuture execWrite() { + @Override + ListenableFuture execWrite(final long txId) { final int i = random.nextInt(MAX_ITEM + 1); - - final YangInstanceIdentifier entryId = - idListWithKey.node(ITEM).node(new NodeIdentifierWithPredicates(ITEM, NUMBER, i)); - final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor = tx.createCursor( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM))); - allTx++; + final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem); + final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); deleteTx++; - cursor.delete(entryId.getLastPathArgument()); + cursor.delete(entryId); usedValues.remove(i); } else { LOG.debug("Inserting item: {}", i); insertTx++; - final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); - cursor.write(entryId.getLastPathArgument(), entry); + + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder().withNodeIdentifier(entryId) + .withChild(ImmutableNodes.leafNode(NUMBER, i)).build(); + cursor.write(entryId, entry); usedValues.add(i); } @@ -185,43 +125,27 @@ public class ProduceTransactionsHandler implements Runnable { return tx.submit(); } - private void maybeFinish(final long current) { - if ((current - startTime) > timeToTake) { - LOG.debug("Reached max running time, waiting for futures to complete."); - scheduledFuture.cancel(false); + @Override + void runFailed(final Throwable cause) { + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build()); + } - final ListenableFuture> allFutures = Futures.allAsList(futures); + @Override + void runSuccessful(final long allTx) { + final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); + future.set(RpcResultBuilder.success() + .withResult(output).build()); + } - try { - // Timeout from cds should be 2 minutes so leave some leeway. - allFutures.get(125, TimeUnit.SECONDS); - - LOG.debug("All futures completed successfully."); - - final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - - - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); - - executor.shutdown(); - } catch (Exception exception) { - LOG.error("Write transactions failed.", exception); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build()); - - executor.shutdown(); - } finally { - try { - itemProducer.close(); - } catch (final DOMDataTreeProducerException e) { - LOG.warn("Failure while closing item producer.", e); - } - } - } + @Override + void runTimedOut(final Exception cause) { + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", cause).build()); } }