From: Tomas Cere Date: Fri, 17 Mar 2017 09:40:38 +0000 (+0100) Subject: Bug 7802 : Implement agent RPCs for transaction producer testing X-Git-Tag: release/carbon~118 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=909db4bfbe3e4e036fc3e968ba6e2b1af150ee66 Bug 7802 : Implement agent RPCs for transaction producer testing Change-Id: I56d89093bd292032f92cdc98f25056822d93e628 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang index 1420a0b8e9..302fbbd362 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang +++ b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang @@ -321,7 +321,23 @@ module odl-mdsal-lowlevel-control { type boolean; } } - // No output. + output { + leaf all-tx { + description "Number of all transactions executed."; + type int64; + mandatory true; + } + leaf insert-tx { + description "Number of transactions that inserted data."; + type int64; + mandatory true; + } + leaf delete-tx { + description "Number of transactions that deleted data."; + type int64; + mandatory true; + } + } } rpc become-prefix-leader { diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/pom.xml b/opendaylight/md-sal/samples/clustering-test-app/provider/pom.xml index 2b38e0f5e3..14f6ef7560 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/pom.xml +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/pom.xml @@ -47,5 +47,9 @@ org.opendaylight.controller sal-common-util + + org.opendaylight.controller + sal-distributed-datastore + diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index 6b94983b6f..f47a66ee15 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -15,8 +15,10 @@ import java.io.StringWriter; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; +import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService; import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler; import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService; @@ -31,6 +33,7 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput; @@ -42,6 +45,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput; @@ -73,6 +77,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final RpcProviderRegistry rpcRegistry; private final BindingAwareBroker.RpcRegistration registration; + private final DistributedShardFactory distributedShardFactory; + private final DOMDataTreeService domDataTreeService; private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer; private final DOMDataBroker domDataBroker; private final NotificationPublishService notificationPublishService; @@ -98,7 +104,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer, final NotificationPublishService notificationPublishService, final NotificationService notificationService, - final DOMDataBroker domDataBroker) { + final DOMDataBroker domDataBroker, + final DOMDataTreeService domDataTreeService, + final DistributedShardFactory distributedShardFactory) { this.rpcRegistry = rpcRegistry; this.domRpcService = domRpcService; this.singletonService = singletonService; @@ -107,6 +115,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService this.notificationPublishService = notificationPublishService; this.notificationService = notificationService; this.domDataBroker = domDataBroker; + this.domDataTreeService = domDataTreeService; + this.distributedShardFactory = distributedShardFactory; registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); } @@ -396,8 +406,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> produceTransactions(ProduceTransactionsInput input) { - return null; + public Future> produceTransactions(final ProduceTransactionsInput input) { + LOG.debug("producer-transactions, input: {}", input); + + final ProduceTransactionsHandler handler = + new ProduceTransactionsHandler(domDataTreeService, input); + + final SettableFuture> settableFuture = SettableFuture.create(); + handler.start(settableFuture); + + return settableFuture; } @Override diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java new file mode 100644 index 0000000000..49024ef283 --- /dev/null +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.clustering.it.provider.impl; + +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SplittableRandom; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProduceTransactionsHandler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class); + private static final int SECOND_AS_NANO = 1000000000; + //2^20 as in the model + private static final int MAX_ITEM = 1048576; + + private static final QName ID_INTS = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); + private static final QName ID = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); + private static final QName ITEM = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); + private static final QName NUMBER = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); + + private final YangInstanceIdentifier ID_INTS_YID = + YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(ID_INTS)); + + private final DOMDataTreeService domDataTreeService; + + private final long timeToTake; + private final long delay; + private final String id; + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ArrayList> futures = new ArrayList<>(); + private final Set usedValues = new HashSet<>(); + private final SplittableRandom random = new SplittableRandom(); + + private long startTime; + private SettableFuture> completionFuture; + + private long allTx = 0; + private long insertTx = 0; + private long deleteTx = 0; + private ScheduledFuture scheduledFuture; + private YangInstanceIdentifier idListWithKey; + private DOMDataTreeProducer itemProducer; + + public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService, + final ProduceTransactionsInput input) { + + this.domDataTreeService = domDataTreeService; + + timeToTake = input.getSeconds() * SECOND_AS_NANO; + delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); + id = input.getId(); + } + + @Override + public void run() { + final long current = System.nanoTime(); + + futures.add(execWrite()); + + maybeFinish(current); + } + + public void start(final SettableFuture> settableFuture) { + + if (ensureListExists(completionFuture) && fillInitialList(completionFuture)) { + startTime = System.nanoTime(); + completionFuture = settableFuture; + scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); + } else { + executor.shutdown(); + } + } + + private boolean ensureListExists(final SettableFuture> settableFuture) { + + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id) + .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) + .build(); + final MapNode mapNode = + ImmutableNodes.mapNodeBuilder(ID_INTS) + .withChild(entry) + .build(); + + final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY))); + + final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false); + + final DOMDataTreeWriteCursor cursor = + tx.createCursor(new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)); + + idListWithKey = ID_INTS_YID.node(entry.getIdentifier()); + + cursor.merge(mapNode.getIdentifier(), mapNode); + cursor.close(); + + try { + tx.submit().checkedGet(); + } catch (TransactionCommitFailedException e) { + LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); + settableFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + return false; + } finally { + try { + producer.close(); + } catch (DOMDataTreeProducerException e) { + LOG.warn("Error while closing producer.", e); + } + } + + return true; + } + + private boolean fillInitialList(final SettableFuture> settableFuture) { + LOG.debug("Filling the item list with initial values."); + + final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); + for (int i = 0; i < MAX_ITEM / 2; i++) { + usedValues.add(i); + mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i)); + } + + itemProducer = domDataTreeService.createProducer( + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); + + final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); + final DOMDataTreeWriteCursor cursor = + tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)); + + final MapNode list = mapBuilder.build(); + cursor.write(list.getIdentifier(), list); + cursor.close(); + + try { + tx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + LOG.warn("Unable to fill the initial item list.", e); + settableFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + return false; + } + + return true; + } + + private CheckedFuture execWrite() { + final int i = random.nextInt(MAX_ITEM + 1); + + final YangInstanceIdentifier entryId = + idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); + + final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); + final DOMDataTreeWriteCursor cursor = tx.createCursor( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM))); + allTx++; + + if (usedValues.contains(i)) { + LOG.debug("Deleting item: {}", i); + deleteTx++; + cursor.delete(entryId.getLastPathArgument()); + usedValues.remove(i); + + } else { + LOG.debug("Inserting item: {}", i); + insertTx++; + final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); + cursor.write(entryId.getLastPathArgument(), entry); + usedValues.add(i); + } + + cursor.close(); + + return tx.submit(); + } + + private void maybeFinish(final long current) { + if ((current - startTime) > timeToTake) { + LOG.debug("Reached max running time, waiting for futures to complete."); + scheduledFuture.cancel(false); + + final ListenableFuture> allFutures = Futures.allAsList(futures); + + Futures.addCallback(allFutures, new FutureCallback>() { + @Override + public void onSuccess(@Nullable final List result) { + LOG.debug("All futures completed successfully."); + + final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); + + completionFuture.set(RpcResultBuilder.success() + .withResult(output).build()); + + executor.shutdown(); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Write transactions failed.", t); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build()); + + executor.shutdown(); + } + }); + } + } +} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml index 0fb755ee98..43a31df09f 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml @@ -15,6 +15,8 @@ + + @@ -57,6 +59,8 @@ + +