Bug 7801 - Implement agent RPCs for transaction writer testing 07/53007/8
authorTomas Cere <tcere@cisco.com>
Mon, 6 Mar 2017 15:11:26 +0000 (16:11 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 17 Mar 2017 18:21:51 +0000 (18:21 +0000)
Change-Id: I75e62deb62f39869be07fcb82f3faee53f337a7d
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java [new file with mode: 0644]
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/resources/org/opendaylight/blueprint/cluster-test-app.xml

index 5a7f2e12e7d46c96cfaaa3be0413149aff4e5209..2a5299d761e328624019d9ac021fc1c5accb9df8 100644 (file)
@@ -232,7 +232,23 @@ module odl-mdsal-lowlevel-control {
                 type boolean;
             }
         }
-        // No output.
+        output {
+            leaf all-tx {
+                description "Number of all transactions executed.";
+                type int64;
+                mandatory true;
+            }
+            leaf insert-tx {
+                description "Number of transactions that inserted data.";
+                type int64;
+                mandatory true;
+            }
+            leaf delete-tx {
+                description "Number of transactions that deleted data.";
+                type int64;
+                mandatory true;
+            }
+        }
     }
 
     rpc produce-transactions {
index 7ada4bea61090c1aa31c384de9a66c92e34c17c9..4ff42c14a1c6c76e1a5446811fca0e7d2fdc1758 100644 (file)
@@ -18,9 +18,11 @@ import org.opendaylight.controller.clustering.it.provider.impl.GetConstantServic
 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
+import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
@@ -50,6 +52,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -66,6 +69,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
+    private final DOMDataBroker domDataBroker;
     private final NotificationPublishService notificationPublishService;
     private final NotificationService notificationService;
     private final SchemaService schemaService;
@@ -87,7 +91,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                                      final SchemaService schemaService,
                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
                                      final NotificationPublishService notificationPublishService,
-                                     final NotificationService notificationService) {
+                                     final NotificationService notificationService,
+                                     final DOMDataBroker domDataBroker) {
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
@@ -95,6 +100,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
         this.notificationPublishService = notificationPublishService;
         this.notificationService = notificationService;
+        this.domDataBroker = domDataBroker;
 
         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
     }
@@ -144,8 +150,15 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> writeTransactions(WriteTransactionsInput input) {
-        return null;
+    public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
+        LOG.debug("write-transactions, input: {}", input);
+
+        final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
+
+        final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
+        writeTransactionsHandler.start(settableFuture);
+
+        return settableFuture;
     }
 
     @Override
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java
new file mode 100644 (file)
index 0000000..fd47b71
--- /dev/null
@@ -0,0 +1,341 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SplittableRandom;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WriteTransactionsHandler implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
+    private static final int SECOND_AS_NANO = 1000000000;
+    //2^20 as in the model
+    private static final int MAX_ITEM = 1048576;
+
+    private static final QName ID_INTS =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
+    private static final QName ID =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
+    private static final QName ITEM =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
+    private static final QName NUMBER =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
+
+    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
+
+    private final DOMDataBroker domDataBroker;
+    private final Long timeToTake;
+    private final Long delay;
+    private final String id;
+    private final WriteTransactionsInput input;
+
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
+    private final Set<Integer> usedValues = new HashSet<>();
+
+    private RandomnessProvider random;
+    private TxProvider txProvider;
+
+    private long startTime;
+    private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
+
+    private long allTx = 0;
+    private long insertTx = 0;
+    private long deleteTx = 0;
+    private ScheduledFuture<?> scheduledFuture;
+    private YangInstanceIdentifier idListWithKey;
+
+    public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
+        this.domDataBroker = domDataBroker;
+        this.input = input;
+
+        timeToTake = input.getSeconds() * SECOND_AS_NANO;
+        delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
+        id = input.getId();
+    }
+
+    @Override
+    public void run() {
+        final long current = System.nanoTime();
+
+        futures.add(execWrite());
+
+        maybeFinish(current);
+    }
+
+    public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
+        LOG.debug("Starting write-transactions.");
+
+        if (input.isChainedTransactions()) {
+            txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
+            random = new BasicProvider();
+        } else {
+            txProvider = new DataBrokerBackedProvider(domDataBroker);
+            random = new NonConflictingProvider();
+        }
+
+        if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
+            startTime = System.nanoTime();
+            completionFuture = settableFuture;
+            scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
+        } else {
+            executor.shutdown();
+        }
+    }
+
+    private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
+
+        final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id)
+                .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
+                .build();
+        final MapNode mapNode =
+                ImmutableNodes.mapNodeBuilder(ID_INTS)
+                        .withChild(entry)
+                        .build();
+
+        final DOMDataWriteTransaction tx = txProvider.createTransaction();
+        idListWithKey = ID_INTS_YID.node(entry.getIdentifier());
+        tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, mapNode);
+
+        try {
+            tx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
+            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
+        LOG.debug("Filling the item list with initial values.");
+
+        final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
+        for (int i = 0; i < MAX_ITEM / 2; i++) {
+            usedValues.add(i);
+            mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
+        }
+
+        final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
+        final DOMDataWriteTransaction tx = txProvider.createTransaction();
+        tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
+
+        try {
+            tx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            LOG.warn("Unable to fill the initial item list.", e);
+            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+            return false;
+        }
+
+        return true;
+    }
+
+    private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
+        final int i = random.nextInt(MAX_ITEM + 1);
+
+        final YangInstanceIdentifier entryId =
+                idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
+
+        final DOMDataWriteTransaction tx = txProvider.createTransaction();
+        allTx++;
+
+        if (usedValues.contains(i)) {
+            LOG.debug("Deleting item: {}", i);
+            deleteTx++;
+            tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
+            usedValues.remove(i);
+
+        } else {
+            LOG.debug("Inserting item: {}", i);
+            insertTx++;
+            final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
+            tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
+            usedValues.add(i);
+        }
+
+        return tx.submit();
+    }
+
+    private void maybeFinish(final long current) {
+        if ((current - startTime) > timeToTake) {
+            LOG.debug("Reached max running time, waiting for futures to complete.");
+            scheduledFuture.cancel(false);
+
+            final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
+
+            Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
+                @Override
+                public void onSuccess(@Nullable final List<Void> result) {
+                    LOG.debug("All futures completed successfully.");
+
+                    final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
+                            .setAllTx(allTx)
+                            .setInsertTx(insertTx)
+                            .setDeleteTx(deleteTx)
+                            .build();
+
+                    completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
+                            .withResult(output).build());
+
+                    executor.shutdown();
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    LOG.error("Write transactions failed.", t);
+                    completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+                            .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
+
+                    executor.shutdown();
+                }
+            });
+        }
+    }
+
+    private interface RandomnessProvider {
+        int nextInt(int bound);
+    }
+
+    private static class NonConflictingProvider implements RandomnessProvider {
+
+        private final SplittableRandom random = new SplittableRandom();
+        private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
+
+        @Override
+        public int nextInt(int bound) {
+            int nextInt;
+            do {
+                nextInt = random.nextInt(bound);
+            } while (previousNumbers.contains(nextInt));
+
+            if (previousNumbers.size() > 100000) {
+                previousNumbers.iterator().remove();
+            }
+            previousNumbers.add(nextInt);
+
+            return nextInt;
+        }
+    }
+
+    private static class BasicProvider implements RandomnessProvider {
+
+        private final SplittableRandom random = new SplittableRandom();
+
+        @Override
+        public int nextInt(int bound) {
+            return random.nextInt(bound);
+        }
+    }
+
+    private interface TxProvider {
+
+        DOMDataWriteTransaction createTransaction();
+    }
+
+    private static class TxChainBackedProvider implements TxProvider {
+
+        private final DOMTransactionChain transactionChain;
+
+        TxChainBackedProvider(final DOMDataBroker dataBroker,
+                              final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
+                              final ScheduledExecutorService executor) {
+
+            transactionChain =
+                    dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
+        }
+
+        @Override
+        public DOMDataWriteTransaction createTransaction() {
+            return transactionChain.newWriteOnlyTransaction();
+        }
+    }
+
+    private static class DataBrokerBackedProvider implements TxProvider {
+
+        private final DOMDataBroker dataBroker;
+
+        DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
+            this.dataBroker = dataBroker;
+        }
+
+        @Override
+        public DOMDataWriteTransaction createTransaction() {
+            return dataBroker.newWriteOnlyTransaction();
+        }
+    }
+
+    private static class TestChainListener implements TransactionChainListener {
+
+        private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
+        private final ScheduledExecutorService executor;
+
+        TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
+                          final ScheduledExecutorService executor) {
+
+            this.resultFuture = resultFuture;
+            this.executor = executor;
+        }
+
+        @Override
+        public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+                                             final AsyncTransaction<?, ?> transaction,
+                                             final Throwable cause) {
+            LOG.warn("Transaction chain failed.", cause);
+            resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
+
+            executor.shutdown();
+        }
+
+        @Override
+        public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+            LOG.debug("Transaction chain closed successfully.");
+        }
+    }
+}
index cb7a29a1e314258eef9db888f5d198c655f36271..0fb755ee98c984f4e17e193cd2e65db5b12fcc41 100644 (file)
@@ -56,6 +56,7 @@
     <argument ref="normalizedNodeSerializer"/>
     <argument ref="notificationPublishService"/>
     <argument ref="notificationListenerService"/>
+    <argument ref="domDataBroker"/>
   </bean>
 
 </blueprint>