From bb61cf2bfc27e04d157f08fac3198fda532cebd6 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Mon, 6 Mar 2017 16:11:26 +0100 Subject: [PATCH] Bug 7801 - Implement agent RPCs for transaction writer testing Change-Id: I75e62deb62f39869be07fcb82f3faee53f337a7d Signed-off-by: Tomas Cere --- .../main/yang/odl-mdsal-lowlevel-control.yang | 18 +- .../provider/MdsalLowLevelTestProvider.java | 19 +- .../impl/WriteTransactionsHandler.java | 341 ++++++++++++++++++ .../blueprint/cluster-test-app.xml | 1 + 4 files changed, 375 insertions(+), 4 deletions(-) create mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java diff --git a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang index 5a7f2e12e7..2a5299d761 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang +++ b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang @@ -232,7 +232,23 @@ module odl-mdsal-lowlevel-control { type boolean; } } - // No output. + output { + leaf all-tx { + description "Number of all transactions executed."; + type int64; + mandatory true; + } + leaf insert-tx { + description "Number of transactions that inserted data."; + type int64; + mandatory true; + } + leaf delete-tx { + description "Number of transactions that deleted data."; + type int64; + mandatory true; + } + } } rpc produce-transactions { diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index 7ada4bea61..4ff42c14a1 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -18,9 +18,11 @@ import org.opendaylight.controller.clustering.it.provider.impl.GetConstantServic import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler; import org.opendaylight.controller.clustering.it.provider.impl.YnlListener; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.NotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; @@ -50,6 +52,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; @@ -66,6 +69,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final RpcProviderRegistry rpcRegistry; private final BindingAwareBroker.RpcRegistration registration; private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer; + private final DOMDataBroker domDataBroker; private final NotificationPublishService notificationPublishService; private final NotificationService notificationService; private final SchemaService schemaService; @@ -87,7 +91,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService final SchemaService schemaService, final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer, final NotificationPublishService notificationPublishService, - final NotificationService notificationService) { + final NotificationService notificationService, + final DOMDataBroker domDataBroker) { this.rpcRegistry = rpcRegistry; this.domRpcService = domRpcService; this.singletonService = singletonService; @@ -95,6 +100,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer; this.notificationPublishService = notificationPublishService; this.notificationService = notificationService; + this.domDataBroker = domDataBroker; registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); } @@ -144,8 +150,15 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> writeTransactions(WriteTransactionsInput input) { - return null; + public Future> writeTransactions(final WriteTransactionsInput input) { + LOG.debug("write-transactions, input: {}", input); + + final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input); + + final SettableFuture> settableFuture = SettableFuture.create(); + writeTransactionsHandler.start(settableFuture); + + return settableFuture; } @Override diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java new file mode 100644 index 0000000000..fd47b7176b --- /dev/null +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -0,0 +1,341 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.clustering.it.provider.impl; + +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.SplittableRandom; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WriteTransactionsHandler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); + private static final int SECOND_AS_NANO = 1000000000; + //2^20 as in the model + private static final int MAX_ITEM = 1048576; + + private static final QName ID_INTS = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); + private static final QName ID = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); + private static final QName ITEM = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); + private static final QName NUMBER = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); + + public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); + + private final DOMDataBroker domDataBroker; + private final Long timeToTake; + private final Long delay; + private final String id; + private final WriteTransactionsInput input; + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ArrayList> futures = new ArrayList<>(); + private final Set usedValues = new HashSet<>(); + + private RandomnessProvider random; + private TxProvider txProvider; + + private long startTime; + private SettableFuture> completionFuture; + + private long allTx = 0; + private long insertTx = 0; + private long deleteTx = 0; + private ScheduledFuture scheduledFuture; + private YangInstanceIdentifier idListWithKey; + + public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) { + this.domDataBroker = domDataBroker; + this.input = input; + + timeToTake = input.getSeconds() * SECOND_AS_NANO; + delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); + id = input.getId(); + } + + @Override + public void run() { + final long current = System.nanoTime(); + + futures.add(execWrite()); + + maybeFinish(current); + } + + public void start(final SettableFuture> settableFuture) { + LOG.debug("Starting write-transactions."); + + if (input.isChainedTransactions()) { + txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor); + random = new BasicProvider(); + } else { + txProvider = new DataBrokerBackedProvider(domDataBroker); + random = new NonConflictingProvider(); + } + + if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) { + startTime = System.nanoTime(); + completionFuture = settableFuture; + scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); + } else { + executor.shutdown(); + } + } + + private boolean ensureListExists(final SettableFuture> settableFuture) { + + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id) + .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) + .build(); + final MapNode mapNode = + ImmutableNodes.mapNodeBuilder(ID_INTS) + .withChild(entry) + .build(); + + final DOMDataWriteTransaction tx = txProvider.createTransaction(); + idListWithKey = ID_INTS_YID.node(entry.getIdentifier()); + tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, mapNode); + + try { + tx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); + settableFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + return false; + } + + return true; + } + + private boolean fillInitialList(final SettableFuture> settableFuture) { + LOG.debug("Filling the item list with initial values."); + + final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); + for (int i = 0; i < MAX_ITEM / 2; i++) { + usedValues.add(i); + mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i)); + } + + final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM); + final DOMDataWriteTransaction tx = txProvider.createTransaction(); + tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build()); + + try { + tx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + LOG.warn("Unable to fill the initial item list.", e); + settableFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + return false; + } + + return true; + } + + private CheckedFuture execWrite() { + final int i = random.nextInt(MAX_ITEM + 1); + + final YangInstanceIdentifier entryId = + idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); + + final DOMDataWriteTransaction tx = txProvider.createTransaction(); + allTx++; + + if (usedValues.contains(i)) { + LOG.debug("Deleting item: {}", i); + deleteTx++; + tx.delete(LogicalDatastoreType.CONFIGURATION, entryId); + usedValues.remove(i); + + } else { + LOG.debug("Inserting item: {}", i); + insertTx++; + final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); + tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry); + usedValues.add(i); + } + + return tx.submit(); + } + + private void maybeFinish(final long current) { + if ((current - startTime) > timeToTake) { + LOG.debug("Reached max running time, waiting for futures to complete."); + scheduledFuture.cancel(false); + + final ListenableFuture> allFutures = Futures.allAsList(futures); + + Futures.addCallback(allFutures, new FutureCallback>() { + @Override + public void onSuccess(@Nullable final List result) { + LOG.debug("All futures completed successfully."); + + final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); + + completionFuture.set(RpcResultBuilder.success() + .withResult(output).build()); + + executor.shutdown(); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Write transactions failed.", t); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build()); + + executor.shutdown(); + } + }); + } + } + + private interface RandomnessProvider { + int nextInt(int bound); + } + + private static class NonConflictingProvider implements RandomnessProvider { + + private final SplittableRandom random = new SplittableRandom(); + private final LinkedHashSet previousNumbers = new LinkedHashSet<>(); + + @Override + public int nextInt(int bound) { + int nextInt; + do { + nextInt = random.nextInt(bound); + } while (previousNumbers.contains(nextInt)); + + if (previousNumbers.size() > 100000) { + previousNumbers.iterator().remove(); + } + previousNumbers.add(nextInt); + + return nextInt; + } + } + + private static class BasicProvider implements RandomnessProvider { + + private final SplittableRandom random = new SplittableRandom(); + + @Override + public int nextInt(int bound) { + return random.nextInt(bound); + } + } + + private interface TxProvider { + + DOMDataWriteTransaction createTransaction(); + } + + private static class TxChainBackedProvider implements TxProvider { + + private final DOMTransactionChain transactionChain; + + TxChainBackedProvider(final DOMDataBroker dataBroker, + final SettableFuture> completionFuture, + final ScheduledExecutorService executor) { + + transactionChain = + dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor)); + } + + @Override + public DOMDataWriteTransaction createTransaction() { + return transactionChain.newWriteOnlyTransaction(); + } + } + + private static class DataBrokerBackedProvider implements TxProvider { + + private final DOMDataBroker dataBroker; + + DataBrokerBackedProvider(final DOMDataBroker dataBroker) { + this.dataBroker = dataBroker; + } + + @Override + public DOMDataWriteTransaction createTransaction() { + return dataBroker.newWriteOnlyTransaction(); + } + } + + private static class TestChainListener implements TransactionChainListener { + + private final SettableFuture> resultFuture; + private final ScheduledExecutorService executor; + + TestChainListener(final SettableFuture> resultFuture, + final ScheduledExecutorService executor) { + + this.resultFuture = resultFuture; + this.executor = executor; + } + + @Override + public void onTransactionChainFailed(final TransactionChain chain, + final AsyncTransaction transaction, + final Throwable cause) { + LOG.warn("Transaction chain failed.", cause); + resultFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); + + executor.shutdown(); + } + + @Override + public void onTransactionChainSuccessful(final TransactionChain chain) { + LOG.debug("Transaction chain closed successfully."); + } + } +} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml index cb7a29a1e3..0fb755ee98 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml @@ -56,6 +56,7 @@ + -- 2.36.6