X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FWriteTransactionsHandler.java;h=00d7f73d1b685fbeb258e09f391e5eaca2c7ea6c;hb=da174be7e22b16d4ac80cccefdc52b209b700745;hp=fd47b7176b2411b2bb629d65c56c5a1bac7474af;hpb=bb61cf2bfc27e04d157f08fac3198fda532cebd6;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java index fd47b7176b..00d7f73d1b 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -5,250 +5,100 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.HashSet; import java.util.LinkedHashSet; -import java.util.List; import java.util.Set; import java.util.SplittableRandom; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WriteTransactionsHandler implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); - private static final int SECOND_AS_NANO = 1000000000; - //2^20 as in the model - private static final int MAX_ITEM = 1048576; - - private static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); - private static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); - private static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); - private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); - - public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - - private final DOMDataBroker domDataBroker; - private final Long timeToTake; - private final Long delay; - private final String id; - private final WriteTransactionsInput input; - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final ArrayList> futures = new ArrayList<>(); - private final Set usedValues = new HashSet<>(); - - private RandomnessProvider random; - private TxProvider txProvider; - - private long startTime; - private SettableFuture> completionFuture; - - private long allTx = 0; - private long insertTx = 0; - private long deleteTx = 0; - private ScheduledFuture scheduledFuture; - private YangInstanceIdentifier idListWithKey; - - public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) { - this.domDataBroker = domDataBroker; - this.input = input; - - timeToTake = input.getSeconds() * SECOND_AS_NANO; - delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); - id = input.getId(); - } - - @Override - public void run() { - final long current = System.nanoTime(); - - futures.add(execWrite()); - - maybeFinish(current); - } - - public void start(final SettableFuture> settableFuture) { - LOG.debug("Starting write-transactions."); +public abstract class WriteTransactionsHandler extends AbstractTransactionHandler { + private static final class Chained extends WriteTransactionsHandler implements DOMTransactionChainListener { + private final SplittableRandom random = new SplittableRandom(); + private final DOMTransactionChain transactionChain; - if (input.isChainedTransactions()) { - txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor); - random = new BasicProvider(); - } else { - txProvider = new DataBrokerBackedProvider(domDataBroker); - random = new NonConflictingProvider(); + Chained(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem, + final WriteTransactionsInput input) { + super(idListItem, input); + transactionChain = dataBroker.createTransactionChain(this); } - if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) { - startTime = System.nanoTime(); - completionFuture = settableFuture; - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); - } else { - executor.shutdown(); + @Override + DOMDataTreeWriteTransaction createTransaction() { + return transactionChain.newWriteOnlyTransaction(); } - } - - private boolean ensureListExists(final SettableFuture> settableFuture) { - - final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id) - .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) - .build(); - final MapNode mapNode = - ImmutableNodes.mapNodeBuilder(ID_INTS) - .withChild(entry) - .build(); - - final DOMDataWriteTransaction tx = txProvider.createTransaction(); - idListWithKey = ID_INTS_YID.node(entry.getIdentifier()); - tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, mapNode); - try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; + @Override + int nextInt(final int bound) { + return random.nextInt(bound); } - return true; - } - - private boolean fillInitialList(final SettableFuture> settableFuture) { - LOG.debug("Filling the item list with initial values."); - - final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); - for (int i = 0; i < MAX_ITEM / 2; i++) { - usedValues.add(i); - mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i)); + @Override + public void onTransactionChainFailed(final DOMTransactionChain chain, final DOMDataTreeTransaction transaction, + final Throwable cause) { + // This is expected to happen frequently in isolation testing. + LOG.debug("Transaction chain failed.", cause); + // Do not return RPC here, rely on transaction failure to call runFailed. } - final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM); - final DOMDataWriteTransaction tx = txProvider.createTransaction(); - tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build()); - - try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - LOG.warn("Unable to fill the initial item list.", e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; + @Override + public void onTransactionChainSuccessful(final DOMTransactionChain chain) { + LOG.debug("Transaction chain closed successfully."); } - - return true; } - private CheckedFuture execWrite() { - final int i = random.nextInt(MAX_ITEM + 1); - - final YangInstanceIdentifier entryId = - idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); - - final DOMDataWriteTransaction tx = txProvider.createTransaction(); - allTx++; - - if (usedValues.contains(i)) { - LOG.debug("Deleting item: {}", i); - deleteTx++; - tx.delete(LogicalDatastoreType.CONFIGURATION, entryId); - usedValues.remove(i); + private static final class Simple extends WriteTransactionsHandler { + private final LinkedHashSet previousNumbers = new LinkedHashSet<>(); + private final SplittableRandom random = new SplittableRandom(); + private final DOMDataBroker dataBroker; - } else { - LOG.debug("Inserting item: {}", i); - insertTx++; - final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); - tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry); - usedValues.add(i); + Simple(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem, + final WriteTransactionsInput input) { + super(idListItem, input); + this.dataBroker = requireNonNull(dataBroker); } - return tx.submit(); - } - - private void maybeFinish(final long current) { - if ((current - startTime) > timeToTake) { - LOG.debug("Reached max running time, waiting for futures to complete."); - scheduledFuture.cancel(false); - - final ListenableFuture> allFutures = Futures.allAsList(futures); - - Futures.addCallback(allFutures, new FutureCallback>() { - @Override - public void onSuccess(@Nullable final List result) { - LOG.debug("All futures completed successfully."); - - final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); - - executor.shutdown(); - } - - @Override - public void onFailure(final Throwable t) { - LOG.error("Write transactions failed.", t); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build()); - - executor.shutdown(); - } - }); + @Override + DOMDataTreeWriteTransaction createTransaction() { + return dataBroker.newWriteOnlyTransaction(); } - } - - private interface RandomnessProvider { - int nextInt(int bound); - } - - private static class NonConflictingProvider implements RandomnessProvider { - - private final SplittableRandom random = new SplittableRandom(); - private final LinkedHashSet previousNumbers = new LinkedHashSet<>(); @Override - public int nextInt(int bound) { + int nextInt(final int bound) { int nextInt; do { nextInt = random.nextInt(bound); @@ -263,79 +113,153 @@ public class WriteTransactionsHandler implements Runnable { } } - private static class BasicProvider implements RandomnessProvider { - - private final SplittableRandom random = new SplittableRandom(); + private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); - @Override - public int nextInt(int bound) { - return random.nextInt(bound); - } - } + final SettableFuture> completionFuture = SettableFuture.create(); + private final Set usedValues = ConcurrentHashMap.newKeySet(); + private final YangInstanceIdentifier idListItem; - private interface TxProvider { + private final AtomicLong insertTx = new AtomicLong(); + private final AtomicLong deleteTx = new AtomicLong(); - DOMDataWriteTransaction createTransaction(); + WriteTransactionsHandler(final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) { + super(input); + this.idListItem = requireNonNull(idListItem); } - private static class TxChainBackedProvider implements TxProvider { + public static ListenableFuture> start(final DOMDataBroker domDataBroker, + final WriteTransactionsInput input) { + LOG.info("Starting write transactions with input {}", input); - private final DOMTransactionChain transactionChain; + final String id = input.getId(); + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) + .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) + .build(); + final YangInstanceIdentifier idListItem = ID_INT_YID.node(entry.getIdentifier()); - TxChainBackedProvider(final DOMDataBroker dataBroker, - final SettableFuture> completionFuture, - final ScheduledExecutorService executor) { + final ContainerNode containerNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(ID_INTS)) + .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build()) + .build(); - transactionChain = - dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor)); + DOMDataTreeWriteTransaction tx = domDataBroker.newWriteOnlyTransaction(); + // write only the top list + tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode); + try { + tx.commit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Error writing top-level path {}: {}", ID_INTS_YID, containerNode, e); + return RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Could not start write transactions - error writing top-level path %s: %s", + ID_INTS_YID, containerNode), e).buildFuture(); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause instanceof OptimisticLockFailedException) { + // when multiple write-transactions are executed concurrently we need to ignore this. + // If we get optimistic lock here it means id-ints already exists and we can continue. + LOG.debug("Got an optimistic lock when writing initial top level list element.", e); + } else { + LOG.error("Error writing top-level path {}: {}", ID_INTS_YID, containerNode, e); + return RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Could not start write transactions - error writing top-level path %s: %s", + ID_INTS_YID, containerNode), e).buildFuture(); + } } - @Override - public DOMDataWriteTransaction createTransaction() { - return transactionChain.newWriteOnlyTransaction(); + tx = domDataBroker.newWriteOnlyTransaction(); + tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry); + + try { + tx.commit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Error writing top-level path {}: {}", idListItem, entry, e); + return RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Could not start write transactions - error writing list entry path %s: %s", + idListItem, entry), e).buildFuture(); } - } - private static class DataBrokerBackedProvider implements TxProvider { + LOG.debug("Filling the item list with initial values."); - private final DOMDataBroker dataBroker; + final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); - DataBrokerBackedProvider(final DOMDataBroker dataBroker) { - this.dataBroker = dataBroker; + final YangInstanceIdentifier itemListId = idListItem.node(ITEM); + tx = domDataBroker.newWriteOnlyTransaction(); + final MapNode itemListNode = mapBuilder.build(); + tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, itemListNode); + + try { + tx.commit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Error filling initial item list path {}: {}", itemListId, itemListNode, e); + return RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Could not start write transactions - error filling initial item list path %s: %s", + itemListId, itemListNode), e).buildFuture(); } - @Override - public DOMDataWriteTransaction createTransaction() { - return dataBroker.newWriteOnlyTransaction(); + final WriteTransactionsHandler handler; + if (input.getChainedTransactions()) { + handler = new Chained(domDataBroker, idListItem, input); + } else { + handler = new Simple(domDataBroker, idListItem, input); } + + handler.doStart(); + + LOG.info("Write transactions successfully started"); + return handler.completionFuture; } - private static class TestChainListener implements TransactionChainListener { + @Override + FluentFuture execWrite(final long txId) { + final int i = nextInt(MAX_ITEM + 1); - private final SettableFuture> resultFuture; - private final ScheduledExecutorService executor; + final YangInstanceIdentifier entryId = + idListItem.node(ITEM).node(YangInstanceIdentifier.NodeIdentifierWithPredicates.of(ITEM, NUMBER, i)); - TestChainListener(final SettableFuture> resultFuture, - final ScheduledExecutorService executor) { + final DOMDataTreeWriteTransaction tx = createTransaction(); - this.resultFuture = resultFuture; - this.executor = executor; - } + if (usedValues.contains(i)) { + LOG.debug("Deleting item: {}", i); + deleteTx.incrementAndGet(); + tx.delete(LogicalDatastoreType.CONFIGURATION, entryId); + usedValues.remove(i); - @Override - public void onTransactionChainFailed(final TransactionChain chain, - final AsyncTransaction transaction, - final Throwable cause) { - LOG.warn("Transaction chain failed.", cause); - resultFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); - - executor.shutdown(); + } else { + LOG.debug("Inserting item: {}", i); + insertTx.incrementAndGet(); + final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); + tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry); + usedValues.add(i); } - @Override - public void onTransactionChainSuccessful(final TransactionChain chain) { - LOG.debug("Transaction chain closed successfully."); - } + return tx.commit(); + } + + @Override + void runFailed(final Throwable cause, final long txId) { + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Commit failed for tx # " + txId, cause).build()); + } + + @Override + void runSuccessful(final long allTx) { + final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx.get()) + .setDeleteTx(deleteTx.get()) + .build(); + + completionFuture.set(RpcResultBuilder.success() + .withResult(output).build()); + } + + @Override + void runTimedOut(final String cause) { + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, cause).build()); } + + abstract DOMDataTreeWriteTransaction createTransaction(); + + abstract int nextInt(int bound); }