Bug 7802 : Implement agent RPCs for transaction producer testing 78/53478/13
authorTomas Cere <tcere@cisco.com>
Fri, 17 Mar 2017 09:40:38 +0000 (10:40 +0100)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 1 Apr 2017 08:39:33 +0000 (08:39 +0000)
Change-Id: I56d89093bd292032f92cdc98f25056822d93e628
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang
opendaylight/md-sal/samples/clustering-test-app/provider/pom.xml
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java [new file with mode: 0644]
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml

index 1420a0b8e9b9948a07d0aa3e385f8b9ab5104589..302fbbd3626e6e4ac80f0510d70acb7b5c3582d1 100644 (file)
@@ -321,7 +321,23 @@ module odl-mdsal-lowlevel-control {
                 type boolean;
             }
         }
-        // No output.
+        output {
+            leaf all-tx {
+                description "Number of all transactions executed.";
+                type int64;
+                mandatory true;
+            }
+            leaf insert-tx {
+                description "Number of transactions that inserted data.";
+                type int64;
+                mandatory true;
+            }
+            leaf delete-tx {
+                description "Number of transactions that deleted data.";
+                type int64;
+                mandatory true;
+            }
+        }
     }
 
     rpc become-prefix-leader {
index 2b38e0f5e3e201b45fbd85d930f74055a60f3437..14f6ef75604da033bc75ce2f2cf9814d36aeb602 100644 (file)
@@ -47,5 +47,9 @@
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal-common-util</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-distributed-datastore</artifactId>
+        </dependency>
     </dependencies>
 </project>
index 6b94983b6f360129beead2ebb82c6ad8f1bdfe2c..f47a66ee15c8f15c40f585d992e2cd1e7df89295 100644 (file)
@@ -15,8 +15,10 @@ import java.io.StringWriter;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Future;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
+import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
@@ -31,6 +33,7 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
@@ -42,6 +45,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
@@ -73,6 +77,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
+    private final DistributedShardFactory distributedShardFactory;
+    private final DOMDataTreeService domDataTreeService;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
     private final DOMDataBroker domDataBroker;
     private final NotificationPublishService notificationPublishService;
@@ -98,7 +104,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
                                      final NotificationPublishService notificationPublishService,
                                      final NotificationService notificationService,
-                                     final DOMDataBroker domDataBroker) {
+                                     final DOMDataBroker domDataBroker,
+                                     final DOMDataTreeService domDataTreeService,
+                                     final DistributedShardFactory distributedShardFactory) {
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
@@ -107,6 +115,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         this.notificationPublishService = notificationPublishService;
         this.notificationService = notificationService;
         this.domDataBroker = domDataBroker;
+        this.domDataTreeService = domDataTreeService;
+        this.distributedShardFactory = distributedShardFactory;
 
         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
     }
@@ -396,8 +406,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> produceTransactions(ProduceTransactionsInput input) {
-        return null;
+    public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
+        LOG.debug("producer-transactions, input: {}", input);
+
+        final ProduceTransactionsHandler handler =
+                new ProduceTransactionsHandler(domDataTreeService, input);
+
+        final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
+        handler.start(settableFuture);
+
+        return settableFuture;
     }
 
     @Override
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java
new file mode 100644 (file)
index 0000000..49024ef
--- /dev/null
@@ -0,0 +1,259 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SplittableRandom;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProduceTransactionsHandler implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
+    private static final int SECOND_AS_NANO = 1000000000;
+    //2^20 as in the model
+    private static final int MAX_ITEM = 1048576;
+
+    private static final QName ID_INTS =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
+    private static final QName ID =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
+    private static final QName ITEM =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
+    private static final QName NUMBER =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
+
+    private final YangInstanceIdentifier ID_INTS_YID =
+            YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(ID_INTS));
+
+    private final DOMDataTreeService domDataTreeService;
+
+    private final long timeToTake;
+    private final long delay;
+    private final String id;
+
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
+    private final Set<Integer> usedValues = new HashSet<>();
+    private final SplittableRandom random = new SplittableRandom();
+
+    private long startTime;
+    private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
+
+    private long allTx = 0;
+    private long insertTx = 0;
+    private long deleteTx = 0;
+    private ScheduledFuture<?> scheduledFuture;
+    private YangInstanceIdentifier idListWithKey;
+    private DOMDataTreeProducer itemProducer;
+
+    public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
+                                      final ProduceTransactionsInput input) {
+
+        this.domDataTreeService = domDataTreeService;
+
+        timeToTake = input.getSeconds() * SECOND_AS_NANO;
+        delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
+        id = input.getId();
+    }
+
+    @Override
+    public void run() {
+        final long current = System.nanoTime();
+
+        futures.add(execWrite());
+
+        maybeFinish(current);
+    }
+
+    public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
+
+        if (ensureListExists(completionFuture) && fillInitialList(completionFuture)) {
+            startTime = System.nanoTime();
+            completionFuture = settableFuture;
+            scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
+        } else {
+            executor.shutdown();
+        }
+    }
+
+    private boolean ensureListExists(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
+
+        final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id)
+                .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
+                .build();
+        final MapNode mapNode =
+                ImmutableNodes.mapNodeBuilder(ID_INTS)
+                        .withChild(entry)
+                        .build();
+
+        final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)));
+
+        final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
+
+        final DOMDataTreeWriteCursor cursor =
+                tx.createCursor(new DOMDataTreeIdentifier(
+                        LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY));
+
+        idListWithKey = ID_INTS_YID.node(entry.getIdentifier());
+
+        cursor.merge(mapNode.getIdentifier(), mapNode);
+        cursor.close();
+
+        try {
+            tx.submit().checkedGet();
+        } catch (TransactionCommitFailedException e) {
+            LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
+            settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+            return false;
+        } finally {
+            try {
+                producer.close();
+            } catch (DOMDataTreeProducerException e) {
+                LOG.warn("Error while closing producer.", e);
+            }
+        }
+
+        return true;
+    }
+
+    private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
+        LOG.debug("Filling the item list with initial values.");
+
+        final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
+        for (int i = 0; i < MAX_ITEM / 2; i++) {
+            usedValues.add(i);
+            mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
+        }
+
+        itemProducer = domDataTreeService.createProducer(
+                Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
+
+        final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
+        final DOMDataTreeWriteCursor cursor =
+                tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
+
+        final MapNode list = mapBuilder.build();
+        cursor.write(list.getIdentifier(), list);
+        cursor.close();
+
+        try {
+            tx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            LOG.warn("Unable to fill the initial item list.", e);
+            settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+            return false;
+        }
+
+        return true;
+    }
+
+    private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
+        final int i = random.nextInt(MAX_ITEM + 1);
+
+        final YangInstanceIdentifier entryId =
+                idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
+
+        final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
+        final DOMDataTreeWriteCursor cursor = tx.createCursor(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM)));
+        allTx++;
+
+        if (usedValues.contains(i)) {
+            LOG.debug("Deleting item: {}", i);
+            deleteTx++;
+            cursor.delete(entryId.getLastPathArgument());
+            usedValues.remove(i);
+
+        } else {
+            LOG.debug("Inserting item: {}", i);
+            insertTx++;
+            final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
+            cursor.write(entryId.getLastPathArgument(), entry);
+            usedValues.add(i);
+        }
+
+        cursor.close();
+
+        return tx.submit();
+    }
+
+    private void maybeFinish(final long current) {
+        if ((current - startTime) > timeToTake) {
+            LOG.debug("Reached max running time, waiting for futures to complete.");
+            scheduledFuture.cancel(false);
+
+            final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
+
+            Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
+                @Override
+                public void onSuccess(@Nullable final List<Void> result) {
+                    LOG.debug("All futures completed successfully.");
+
+                    final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
+                            .setAllTx(allTx)
+                            .setInsertTx(insertTx)
+                            .setDeleteTx(deleteTx)
+                            .build();
+
+                    completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
+                            .withResult(output).build());
+
+                    executor.shutdown();
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    LOG.error("Write transactions failed.", t);
+                    completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+                            .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
+
+                    executor.shutdown();
+                }
+            });
+        }
+    }
+}
index 0fb755ee98c984f4e17e193cd2e65db5b12fcc41..43a31df09f77a48d403f0be557d61ecb2a07cea3 100644 (file)
@@ -15,6 +15,8 @@
   <reference id="normalizedNodeSerializer" interface="org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer"/>
   <reference id="notificationPublishService" interface="org.opendaylight.controller.md.sal.binding.api.NotificationPublishService" />
   <reference id="notificationListenerService" interface="org.opendaylight.controller.md.sal.binding.api.NotificationService" />
+  <reference id="domDataTreeService" interface="org.opendaylight.mdsal.dom.api.DOMDataTreeService"/>
+  <reference id="distributedShardFactory" interface="org.opendaylight.controller.cluster.sharding.DistributedShardFactory"/>
 
   <bean id="purchaseCarProvider" class="org.opendaylight.controller.clustering.it.provider.PurchaseCarProvider" >
     <property name="notificationProvider" ref="notificationService"/>
@@ -57,6 +59,8 @@
     <argument ref="notificationPublishService"/>
     <argument ref="notificationListenerService"/>
     <argument ref="domDataBroker"/>
+    <argument ref="domDataTreeService"/>
+    <argument ref="distributedShardFactory"/>
   </bean>
 
 </blueprint>