X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FWriteTransactionsHandler.java;h=797e252a914a6cdcd0a3811e1dcae9a4c12a2c6b;hp=a026e6f2f3b00f8c536d099616acbff0962df356;hb=d97061af6814ad7b085af10797a252aa4aa5cda6;hpb=bc5486e6d9fab8f550be8b72874ce96a9eb52651 diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java index a026e6f2f3..797e252a91 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -8,22 +8,14 @@ package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashSet; -import java.util.List; import java.util.Set; import java.util.SplittableRandom; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; @@ -38,7 +30,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -53,155 +44,163 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WriteTransactionsHandler implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); - private static final int SECOND_AS_NANO = 1000000000; - //2^20 as in the model - private static final int MAX_ITEM = 1048576; - - private static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); - private static final QName ID_INT = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); - private static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); - private static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); - private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern(); - - public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); - - private final WriteTransactionsInput input; - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final List> futures = new ArrayList<>(); - private final Set usedValues = new HashSet<>(); - - private RandomnessProvider random; - private TxProvider txProvider; +public abstract class WriteTransactionsHandler extends AbstractTransactionHandler { + private static final class Chained extends WriteTransactionsHandler implements TransactionChainListener { + private final SplittableRandom random = new SplittableRandom(); + private final DOMTransactionChain transactionChain; - private final DOMDataBroker domDataBroker; - private final Long runtimeNanos; - private final Long delayNanos; - private final String id; + Chained(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem, + final WriteTransactionsInput input) { + super(idListItem, input); + transactionChain = dataBroker.createTransactionChain(this); + } - private SettableFuture> completionFuture; - private Stopwatch stopwatch; + @Override + DOMDataWriteTransaction createTransaction() { + return transactionChain.newWriteOnlyTransaction(); + } - private long allTx = 0; - private long insertTx = 0; - private long deleteTx = 0; - private ScheduledFuture scheduledFuture; - private YangInstanceIdentifier idListItem; + @Override + int nextInt(final int bound) { + return random.nextInt(bound); + } - public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) { - this.domDataBroker = domDataBroker; - this.input = input; + @Override + public void onTransactionChainFailed(final TransactionChain chain, + final AsyncTransaction transaction, final Throwable cause) { + LOG.warn("Transaction chain failed.", cause); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); + } - runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds()); - delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond(); - id = input.getId(); + @Override + public void onTransactionChainSuccessful(final TransactionChain chain) { + LOG.debug("Transaction chain closed successfully."); + } } - @Override - public void run() { - futures.add(execWrite(futures.size())); - maybeFinish(); - } + private static final class Simple extends WriteTransactionsHandler { + private final LinkedHashSet previousNumbers = new LinkedHashSet<>(); + private final SplittableRandom random = new SplittableRandom(); + private final DOMDataBroker dataBroker; - public void start(final SettableFuture> settableFuture) { - LOG.debug("Starting write-transactions."); + Simple(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem, + final WriteTransactionsInput input) { + super(idListItem, input); + this.dataBroker = Preconditions.checkNotNull(dataBroker); + } - if (input.isChainedTransactions()) { - txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor); - random = new BasicProvider(); - } else { - txProvider = new DataBrokerBackedProvider(domDataBroker); - random = new NonConflictingProvider(); + @Override + DOMDataWriteTransaction createTransaction() { + return dataBroker.newWriteOnlyTransaction(); } - if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) { - stopwatch = Stopwatch.createStarted(); - completionFuture = settableFuture; - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS); - } else { - executor.shutdown(); + @Override + int nextInt(final int bound) { + int nextInt; + do { + nextInt = random.nextInt(bound); + } while (previousNumbers.contains(nextInt)); + + if (previousNumbers.size() > 100000) { + previousNumbers.iterator().remove(); + } + previousNumbers.add(nextInt); + + return nextInt; } } - private boolean ensureListExists(final SettableFuture> settableFuture) { + private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); + + final SettableFuture> completionFuture = SettableFuture.create(); + private final Set usedValues = new HashSet<>(); + private final YangInstanceIdentifier idListItem; + + private long insertTx = 0; + private long deleteTx = 0; + + WriteTransactionsHandler(final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) { + super(input); + this.idListItem = Preconditions.checkNotNull(idListItem); + } + + public static ListenableFuture> start(final DOMDataBroker domDataBroker, + final WriteTransactionsInput input) { + LOG.debug("Starting write-transactions."); + + final String id = input.getId(); + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) + .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) + .build(); + final YangInstanceIdentifier idListItem = ID_INT_YID.node(entry.getIdentifier()); final ContainerNode containerNode = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(new NodeIdentifier(ID_INTS)) .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build()) .build(); - DOMDataWriteTransaction tx = txProvider.createTransaction(); + DOMDataWriteTransaction tx = domDataBroker.newWriteOnlyTransaction(); // write only the top list tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode); try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final OptimisticLockFailedException e) { // when multiple write-transactions are executed concurrently we need to ignore this. // If we get optimistic lock here it means id-ints already exists and we can continue. LOG.debug("Got an optimistic lock when writing initial top level list element.", e); } catch (final TransactionCommitFailedException | TimeoutException e) { LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - settableFuture.set(RpcResultBuilder.failed() + return Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; } - final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) - .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) - .build(); - - idListItem = ID_INT_YID.node(entry.getIdentifier()); - tx = txProvider.createTransaction(); + tx = domDataBroker.newWriteOnlyTransaction(); tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry); try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); - return true; + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - settableFuture.set(RpcResultBuilder.failed() + return Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; } - } - private boolean fillInitialList(final SettableFuture> settableFuture) { LOG.debug("Filling the item list with initial values."); final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); final YangInstanceIdentifier itemListId = idListItem.node(ITEM); - final DOMDataWriteTransaction tx = txProvider.createTransaction(); + tx = domDataBroker.newWriteOnlyTransaction(); tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build()); try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); - return true; + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); - settableFuture.set(RpcResultBuilder.failed() + return Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; } + + final WriteTransactionsHandler handler; + if (input.isChainedTransactions()) { + handler = new Chained(domDataBroker, idListItem, input); + } else { + handler = new Simple(domDataBroker, idListItem, input); + } + + handler.doStart(); + return handler.completionFuture; } - private ListenableFuture execWrite(final int offset) { - final int i = random.nextInt(MAX_ITEM + 1); + @Override + ListenableFuture execWrite(final long txId) { + final int i = nextInt(MAX_ITEM + 1); final YangInstanceIdentifier entryId = idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); - final DOMDataWriteTransaction tx = txProvider.createTransaction(); - allTx++; + final DOMDataWriteTransaction tx = createTransaction(); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); @@ -217,181 +216,35 @@ public class WriteTransactionsHandler implements Runnable { usedValues.add(i); } - final ListenableFuture future = tx.submit(); - if (LOG.isDebugEnabled()) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - LOG.debug("Future #{} completed successfully", offset); - } - - @Override - public void onFailure(final Throwable cause) { - LOG.debug("Future #{} failed", offset, cause); - } - }); - } - - return future; - } - - private void maybeFinish() { - final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - if (elapsed >= runtimeNanos) { - LOG.debug("Reached max running time, waiting for futures to complete."); - scheduledFuture.cancel(false); - - final ListenableFuture> allFutures = Futures.allAsList(futures); - - try { - // Timeout from cds should be 2 minutes so leave some leeway. - allFutures.get(125, TimeUnit.SECONDS); - - LOG.debug("All futures completed successfully."); - - final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); - - executor.shutdown(); - } catch (final ExecutionException e) { - LOG.error("Write transactions failed.", e.getCause()); - - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build()); - } catch (InterruptedException | TimeoutException e) { - LOG.error("Write transactions failed.", e); - - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, - "Final submit was timed out by the test provider or was interrupted", e).build()); - - for (int i = 0; i < futures.size(); i++) { - final ListenableFuture future = futures.get(i); - - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (final TimeoutException fe) { - LOG.warn("Future #{}/{} not completed yet", i, futures.size()); - } catch (final ExecutionException fe) { - LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause()); - } catch (final InterruptedException fe) { - LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e); - } - } - } catch (Exception exception) { - LOG.error("Write transactions failed.", exception); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build()); - - executor.shutdown(); - } - } - } - - private interface RandomnessProvider { - int nextInt(int bound); - } - - private static class NonConflictingProvider implements RandomnessProvider { - - private final SplittableRandom random = new SplittableRandom(); - private final LinkedHashSet previousNumbers = new LinkedHashSet<>(); - - @Override - public int nextInt(int bound) { - int nextInt; - do { - nextInt = random.nextInt(bound); - } while (previousNumbers.contains(nextInt)); - - if (previousNumbers.size() > 100000) { - previousNumbers.iterator().remove(); - } - previousNumbers.add(nextInt); - - return nextInt; - } + return tx.submit(); } - private static class BasicProvider implements RandomnessProvider { - - private final SplittableRandom random = new SplittableRandom(); - - @Override - public int nextInt(int bound) { - return random.nextInt(bound); - } - } - - private interface TxProvider { - - DOMDataWriteTransaction createTransaction(); + @Override + void runFailed(final Throwable cause) { + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build()); } - private static class TxChainBackedProvider implements TxProvider { - - private final DOMTransactionChain transactionChain; - - TxChainBackedProvider(final DOMDataBroker dataBroker, - final SettableFuture> completionFuture, - final ScheduledExecutorService executor) { - - transactionChain = - dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor)); - } + @Override + void runSuccessful(final long allTx) { + final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); - @Override - public DOMDataWriteTransaction createTransaction() { - return transactionChain.newWriteOnlyTransaction(); - } + completionFuture.set(RpcResultBuilder.success() + .withResult(output).build()); } - private static class DataBrokerBackedProvider implements TxProvider { - - private final DOMDataBroker dataBroker; - - DataBrokerBackedProvider(final DOMDataBroker dataBroker) { - this.dataBroker = dataBroker; - } - - @Override - public DOMDataWriteTransaction createTransaction() { - return dataBroker.newWriteOnlyTransaction(); - } + @Override + void runTimedOut(final Exception cause) { + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", cause).build()); } - private static class TestChainListener implements TransactionChainListener { - - private final SettableFuture> resultFuture; - private final ScheduledExecutorService executor; - - TestChainListener(final SettableFuture> resultFuture, - final ScheduledExecutorService executor) { - - this.resultFuture = resultFuture; - this.executor = executor; - } - - @Override - public void onTransactionChainFailed(final TransactionChain chain, - final AsyncTransaction transaction, - final Throwable cause) { - LOG.warn("Transaction chain failed.", cause); - resultFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); - - executor.shutdown(); - } + abstract DOMDataWriteTransaction createTransaction(); - @Override - public void onTransactionChainSuccessful(final TransactionChain chain) { - LOG.debug("Transaction chain closed successfully."); - } - } + abstract int nextInt(int bound); }