BUG-8494: Cleanup clustering-it-provider 46/59846/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 23 Jun 2017 11:02:40 +0000 (13:02 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 3 Jul 2017 11:08:18 +0000 (11:08 +0000)
Fixes various warnings and refactors MdsalLowLevelTestProvider
to be slightly cleaner in terms of number of classes.

It also eliminates synchronous thread blocking on future collection
and instead schedules task which performs the cleanup if the system
gets stuck.

Change-Id: I657f3df60c620284538bdf39ab1536eac8448801
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit d97061af6814ad7b085af10797a252aa4aa5cda6)

13 files changed:
opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataChangeListener.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataTreeChangeListener.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarEntryDataTreeCommitCohort.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/PeopleProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java [new file with mode: 0644]
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/RoutedGetConstantService.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/YnlListener.java

index 89d220bcf519ec590e783e63d31e04c3099dbd1d..aa2f2da7dd0fdb9746328321aaf921594786f349 100644 (file)
@@ -227,6 +227,37 @@ module odl-mdsal-lowlevel-control {
         }
     }
 
+    grouping transactions-params {
+        leaf seconds {
+            description "This RPC has to work (roughly) this long.";
+            mandatory true;
+            type uint32;
+        }
+        leaf transactions-per-second {
+            description "An upper limit of transactions per second this RPC shall try to achieve.";
+            mandatory true;
+            type uint32;
+        }
+    }
+
+    grouping transactions-result {
+        leaf all-tx {
+            description "Number of all transactions executed.";
+            type int64;
+            mandatory true;
+        }
+        leaf insert-tx {
+            description "Number of transactions that inserted data.";
+            type int64;
+            mandatory true;
+        }
+        leaf delete-tx {
+            description "Number of transactions that deleted data.";
+            type int64;
+            mandatory true;
+        }
+    }
+
     rpc write-transactions {
         description "Upon receiving this, the member shall make sure the outer list item
             of llt:id-ints exists for the given id, and then start creating (one by one)
@@ -245,16 +276,7 @@ module odl-mdsal-lowlevel-control {
             OptimisticLockException is always considered an error.";
         input {
             uses llc:id-grouping;
-            leaf seconds {
-                description "This RPC has to work (roughly) this long.";
-                mandatory true;
-                type uint32;
-            }
-            leaf transactions-per-second {
-                description "An upper limit of transactions per second this RPC shall try to achieve.";
-                mandatory true;
-                type uint32;
-            }
+            uses transactions-params;
             leaf chained-transactions {
                 description "If true, write transactions shall be created on a transaction chain,
                     (created at start of the RPC call, and deleted at at its end).
@@ -264,21 +286,7 @@ module odl-mdsal-lowlevel-control {
             }
         }
         output {
-            leaf all-tx {
-                description "Number of all transactions executed.";
-                type int64;
-                mandatory true;
-            }
-            leaf insert-tx {
-                description "Number of transactions that inserted data.";
-                type int64;
-                mandatory true;
-            }
-            leaf delete-tx {
-                description "Number of transactions that deleted data.";
-                type int64;
-                mandatory true;
-            }
+            uses transactions-result;
         }
     }
 
@@ -305,16 +313,7 @@ module odl-mdsal-lowlevel-control {
             but the shard and the whole id item shall be kept as they are.";
         input {
             uses llc:id-grouping;
-            leaf seconds {
-                description "This RPC has to work (roughly) this long.";
-                mandatory true;
-                type uint32;
-            }
-            leaf transactions-per-second {
-                description "An upper limit of transactions per second this RPC shall try to achieve.";
-                mandatory true;
-                type uint32;
-            }
+            uses transactions-params;
             leaf isolated-transactions {
                 description "The value for DOMDataTreeProducer#createTransaction argument.";
                 mandatory true;
@@ -322,21 +321,7 @@ module odl-mdsal-lowlevel-control {
             }
         }
         output {
-            leaf all-tx {
-                description "Number of all transactions executed.";
-                type int64;
-                mandatory true;
-            }
-            leaf insert-tx {
-                description "Number of transactions that inserted data.";
-                type int64;
-                mandatory true;
-            }
-            leaf delete-tx {
-                description "Number of transactions that deleted data.";
-                type int64;
-                mandatory true;
-            }
+            uses transactions-result;
         }
     }
 
index ac5c368927ea87f509ff490c66df664c1322a700..22def4c3b7f39e22bdc4ba5a25a3e31eea59bfc2 100644 (file)
@@ -27,14 +27,14 @@ public class CarDataChangeListener implements DataChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(CarDataChangeListener.class);
 
     @Override
-    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("onDataChanged invoked");
             outputChanges(change);
         }
     }
 
-    private void outputChanges(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+    private static void outputChanges(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
         final Map<InstanceIdentifier<?>, DataObject> originalData = change.getOriginalData() != null ?
                 change.getOriginalData(): Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
         final Map<InstanceIdentifier<?>, DataObject> updatedData = change.getUpdatedData() != null ?
index c157b6cd8997cb892e90e3da3c315433ad0088ae..a10e22f87152f3c438f0d07d49db575b4b781cb1 100644 (file)
@@ -8,13 +8,9 @@
 
 package org.opendaylight.controller.clustering.it.provider;
 
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE;
-
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
@@ -32,7 +28,7 @@ public class CarDataTreeChangeListener implements DataTreeChangeListener<Cars> {
     private static final Logger LOG = LoggerFactory.getLogger(CarDataTreeChangeListener.class);
 
     @java.lang.Override
-    public void onDataTreeChanged(@Nonnull java.util.Collection<DataTreeModification<Cars>> changes) {
+    public void onDataTreeChanged(@Nonnull final java.util.Collection<DataTreeModification<Cars>> changes) {
         if (LOG.isTraceEnabled()) {
             for (DataTreeModification<Cars> change : changes) {
                 ouputChanges(change);
@@ -40,7 +36,7 @@ public class CarDataTreeChangeListener implements DataTreeChangeListener<Cars> {
         }
     }
 
-    private void ouputChanges(final DataTreeModification<Cars> change) {
+    private static void ouputChanges(final DataTreeModification<Cars> change) {
         final DataObjectModification<Cars> rootNode = change.getRootNode();
         final ModificationType modificationType = rootNode.getModificationType();
         final InstanceIdentifier<Cars> rootIdentifier = change.getRootPath().getRootIdentifier();
index 1bc6581b91231daf0073df21b2277d2da015c13b..270015956294336da2be6cc6c1763dc228a6e28a 100644 (file)
@@ -39,10 +39,9 @@ public class CarEntryDataTreeCommitCohort implements DOMDataTreeCommitCohort {
     private static final QName YEAR_QNAME = QName.create(Cars.QNAME, "year").intern();
     private static final NodeIdentifier YEAR_NODE_ID = new NodeIdentifier(YEAR_QNAME);
 
-    @SuppressWarnings("unchecked")
     @Override
-    public CheckedFuture<PostCanCommitStep, DataValidationFailedException> canCommit(Object txId,
-            DOMDataTreeCandidate candidate, SchemaContext ctx) {
+    public CheckedFuture<PostCanCommitStep, DataValidationFailedException> canCommit(final Object txId,
+            final DOMDataTreeCandidate candidate, final SchemaContext ctx) {
 
         // Simple data validation - verify the year, if present, is >= 1990
 
@@ -77,6 +76,6 @@ public class CarEntryDataTreeCommitCohort implements DOMDataTreeCommitCohort {
 
         // Return the noop PostCanCommitStep as we're only validating input data and not participating in the
         // remaining 3PC stages (pre-commit and commit).
-        return (CheckedFuture<PostCanCommitStep, DataValidationFailedException>) PostCanCommitStep.NOOP_SUCCESS_FUTURE;
+        return PostCanCommitStep.NOOP_SUCCESS_FUTURE;
     }
 }
index ec4c1bb1618e0f79f3a3848fa9b2850e2ee06da9..ea9f7b1d762e3dde09cb24a36d18dd4b397494e6 100644 (file)
@@ -20,7 +20,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
@@ -63,7 +62,7 @@ public class CarProvider implements CarService {
     private static final Logger LOG_CAR_PROVIDER = LoggerFactory.getLogger(CarProvider.class);
 
     private static final String ENTITY_TYPE = "cars";
-    private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build();
+    private static final InstanceIdentifier<Cars> CARS_IID = InstanceIdentifier.builder(Cars.class).build();
     private static final DataTreeIdentifier<Cars> CARS_DTID = new DataTreeIdentifier<>(
             LogicalDatastoreType.CONFIGURATION, CARS_IID);
 
@@ -76,7 +75,7 @@ public class CarProvider implements CarService {
     private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener();
     private final AtomicBoolean registeredListener = new AtomicBoolean();
 
-    private final Collection<ListenerRegistration<DataChangeListener>> carsDclRegistrations =
+    private final Collection<ListenerRegistration<?>> carsDclRegistrations =
             Sets.newConcurrentHashSet();
     private final Collection<ListenerRegistration<CarDataTreeChangeListener>> carsDtclRegistrations =
             Sets.newConcurrentHashSet();
@@ -86,8 +85,8 @@ public class CarProvider implements CarService {
     private final AtomicReference<DOMDataTreeCommitCohortRegistration<CarEntryDataTreeCommitCohort>> commitCohortReg =
             new AtomicReference<>();
 
-    public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService,
-            DOMDataBroker domDataBroker) {
+    public CarProvider(final DataBroker dataProvider, final EntityOwnershipService ownershipService,
+            final DOMDataBroker domDataBroker) {
         this.dataProvider = dataProvider;
         this.ownershipService = ownershipService;
         this.domDataBroker = domDataBroker;
@@ -110,7 +109,7 @@ public class CarProvider implements CarService {
     }
 
     @Override
-    public Future<RpcResult<Void>> stressTest(StressTestInput input) {
+    public Future<RpcResult<Void>> stressTest(final StressTestInput input) {
         final int inputRate;
         final long inputCount;
 
@@ -120,10 +119,9 @@ public class CarProvider implements CarService {
             return Futures.immediateFuture(RpcResultBuilder.<Void>failed()
                     .withError(ErrorType.PROTOCOL, "invalid rate")
                     .build());
-        } else {
-            inputRate = input.getRate();
         }
 
+        inputRate = input.getRate();
         if (input.getCount() != null) {
             inputCount = input.getCount();
         } else {
@@ -218,7 +216,7 @@ public class CarProvider implements CarService {
 
 
     @Override
-    public Future<RpcResult<Void>> registerOwnership(RegisterOwnershipInput input) {
+    public Future<RpcResult<Void>> registerOwnership(final RegisterOwnershipInput input) {
         if(registeredListener.compareAndSet(false, true)) {
             ownershipService.registerListener(ENTITY_TYPE, ownershipListener);
         }
@@ -235,13 +233,13 @@ public class CarProvider implements CarService {
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterOwnership(UnregisterOwnershipInput input) {
+    public Future<RpcResult<Void>> unregisterOwnership(final UnregisterOwnershipInput input) {
         return RpcResultBuilder.<Void>success().buildFuture();
     }
 
     private static class CarEntityOwnershipListener implements EntityOwnershipListener {
         @Override
-        public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+        public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
             LOG_CAR_PROVIDER.info("ownershipChanged: {}", ownershipChange);
         }
     }
@@ -278,7 +276,7 @@ public class CarProvider implements CarService {
         LOG_CAR_PROVIDER.info("Unregistering the CarDataChangeListener(s)");
         synchronized (carsDclRegistrations) {
             int numListeners = 0;
-            for (ListenerRegistration<DataChangeListener> carsDclRegistration : carsDclRegistrations) {
+            for (ListenerRegistration<?> carsDclRegistration : carsDclRegistrations) {
                 carsDclRegistration.close();
                 numListeners++;
             }
index a93d99fa45ac028de4ee0b235c41ae63e5027588..e0e8d99d1aab3eac8df847bf7075de6e15b0257e 100644 (file)
@@ -137,17 +137,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final DOMDataTreeChangeService domDataTreeChangeService;
     private final ActorSystem actorSystem;
 
-    private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
+    private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
             new HashMap<>();
 
-    private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+    private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
 
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
     private IdIntsListener idIntsListener;
-    private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+    private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
     private IdIntsDOMDataTreeLIstener idIntsDdtl;
 
@@ -252,13 +252,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     @Override
     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
         LOG.debug("write-transactions, input: {}", input);
-
-        final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
-        final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
-        writeTransactionsHandler.start(settableFuture);
-
-        return settableFuture;
+        return WriteTransactionsHandler.start(domDataBroker, input);
     }
 
     @Override
@@ -267,7 +261,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+    public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
         return null;
     }
 
@@ -339,7 +333,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+    public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
         return null;
     }
 
@@ -381,7 +375,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+    public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         return null;
     }
 
@@ -573,14 +567,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     @Override
     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
         LOG.debug("producer-transactions, input: {}", input);
-
-        final ProduceTransactionsHandler handler =
-                new ProduceTransactionsHandler(domDataTreeService, input);
-
-        final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
-        handler.start(settableFuture);
-
-        return settableFuture;
+        return ProduceTransactionsHandler.start(domDataTreeService, input);
     }
 
     @Override
@@ -601,7 +588,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
 
-        final InstanceIdentifier shardPrefix = input.getPrefix();
+        final InstanceIdentifier<?> shardPrefix = input.getPrefix();
 
         if (shardPrefix == null) {
             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
index 48385b4e2f95cf944565845d9858c682a99392e4..09822831b2c300865ca209eb23f7c248702a6271 100644 (file)
@@ -43,12 +43,12 @@ public class PeopleProvider implements PeopleService, AutoCloseable {
   }
 
 
-  public void setRpcRegistration(BindingAwareBroker.RoutedRpcRegistration<CarPurchaseService> rpcRegistration) {
+  public void setRpcRegistration(final BindingAwareBroker.RoutedRpcRegistration<CarPurchaseService> rpcRegistration) {
     this.rpcRegistration = rpcRegistration;
   }
 
   @Override
-  public Future<RpcResult<Void>> addPerson(AddPersonInput input) {
+  public Future<RpcResult<Void>> addPerson(final AddPersonInput input) {
     LOG.info("RPC addPerson : adding person [{}]", input);
 
     PersonBuilder builder = new PersonBuilder(input);
@@ -59,7 +59,7 @@ public class PeopleProvider implements PeopleService, AutoCloseable {
     final InstanceIdentifier.InstanceIdentifierBuilder<Person> personIdBuilder =
         InstanceIdentifier.<People>builder(People.class)
             .child(Person.class, person.getKey());
-    final InstanceIdentifier personId = personIdBuilder.build();
+    final InstanceIdentifier<Person> personId = personIdBuilder.build();
     // Place entry in data store tree
     WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
     tx.put(LogicalDatastoreType.CONFIGURATION, personId, person, true);
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
new file mode 100644 (file)
index 0000000..f23e7ec
--- /dev/null
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractTransactionHandler {
+    private abstract static class Phase {
+        abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
+
+        abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
+    }
+
+    private static final class Running extends Phase {
+        private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
+        private Throwable failure;
+
+        void addFuture(final ListenableFuture<Void> execFuture) {
+            futures.add(execFuture);
+        }
+
+        @Override
+        void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+            futures.remove(execFuture);
+        }
+
+        @Override
+        void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+            futures.remove(execFuture);
+            if (failure != null) {
+                failure = cause;
+            }
+        }
+
+        Optional<Throwable> getFailure() {
+            return Optional.ofNullable(failure);
+        }
+    }
+
+    private final class Collecting extends Phase {
+        private final List<ListenableFuture<Void>> futures;
+        private boolean done;
+
+        Collecting(final Collection<ListenableFuture<Void>> futures) {
+            this.futures = new ArrayList<>(futures);
+        }
+
+        @Override
+        void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+            futures.remove(execFuture);
+            if (futures.isEmpty() && !done) {
+                LOG.debug("All futures completed successfully.");
+                runSuccessful(txCounter);
+            }
+        }
+
+        @Override
+        void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+            futures.remove(execFuture);
+            done = true;
+            runFailed(cause);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
+
+    static final int SECOND_AS_NANO = 1000000000;
+    //2^20 as in the model
+    static final int MAX_ITEM = 1048576;
+
+    static final QName ID_INTS =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
+    static final QName ID =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
+    static final QName ITEM =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
+    static final QName NUMBER =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
+
+    public static final QName ID_INT =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
+    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
+    public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
+
+    static final long INIT_TX_TIMEOUT_SECONDS = 125;
+
+    private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
+
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    private final Stopwatch stopwatch = Stopwatch.createStarted();
+    private final long runtimeNanos;
+    private final long delayNanos;
+
+    private ScheduledFuture<?> scheduledFuture;
+    private long txCounter;
+    @GuardedBy("this")
+    private Phase phase;
+
+    AbstractTransactionHandler(final TransactionsParams params) {
+        runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
+        delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
+    }
+
+    final synchronized void doStart() {
+        phase = new Running();
+        scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+    }
+
+    private void execute() {
+        final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+        if (elapsed < runtimeNanos) {
+            // Not completed yet: create a transaction and hook it up
+            final long txId = txCounter++;
+            final ListenableFuture<Void> execFuture = execWrite(txId);
+
+            // Ordering is important: we need to add the future before hooking the callback
+            synchronized (this) {
+                ((Running) phase).addFuture(execFuture);
+            }
+            Futures.addCallback(execFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void result) {
+                    txSuccess(execFuture, txId);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    txFailure(execFuture, txId, cause);
+                }
+            });
+        } else {
+            startCollection();
+        }
+    }
+
+    private synchronized void startCollection() {
+        scheduledFuture.cancel(false);
+
+        final Running running = (Running) phase;
+        final Optional<Throwable> failure = running.getFailure();
+        if (failure.isPresent()) {
+            executor.shutdown();
+            runFailed(failure.get());
+            return;
+        }
+
+        LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size());
+        if (running.futures.isEmpty()) {
+            executor.shutdown();
+            runSuccessful(txCounter);
+            return;
+        }
+
+        phase = new Collecting(running.futures);
+        executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        executor.shutdown();
+    }
+
+    final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+        LOG.debug("Future #{} completed successfully", txId);
+        phase.txSuccess(execFuture, txId);
+    }
+
+    final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+        LOG.debug("Future #{} failed", txId, cause);
+        phase.txFailure(execFuture, txId, cause);
+    }
+
+    private synchronized void checkCollection() {
+        final Collecting collecting = (Collecting) phase;
+        if (!collecting.done) {
+            final int size = collecting.futures.size();
+            for (int i = 0; i < size; i++) {
+                final ListenableFuture<Void> future = collecting.futures.get(i);
+
+                try {
+                    future.get(0, TimeUnit.NANOSECONDS);
+                } catch (final TimeoutException e) {
+                    LOG.warn("Future #{}/{} not completed yet", i, size);
+                } catch (final ExecutionException e) {
+                    LOG.warn("Future #{}/{} failed", i, size, e.getCause());
+                } catch (final InterruptedException e) {
+                    LOG.warn("Interrupted while examining future #{}/{}", i, size, e);
+                }
+            }
+
+            runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
+        }
+    }
+
+    abstract ListenableFuture<Void> execWrite(final long txId);
+
+    abstract void runFailed(Throwable cause);
+
+    abstract void runSuccessful(long allTx);
+
+    abstract void runTimedOut(Exception cause);
+}
index 8f711b3337c9fbdfc8087aa138df3d5a2712d007..4ffb415d82a5eea32a8b168aa02fcab7ef3994d4 100644 (file)
@@ -8,10 +8,10 @@
 
 package org.opendaylight.controller.clustering.it.provider.impl;
 
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INT;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INTS;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ITEM;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INT;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INTS;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ITEM;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
@@ -38,7 +38,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -94,13 +93,13 @@ public class PrefixShardHandler {
                 final CheckedFuture<Void, TransactionCommitFailedException> ensureFuture = ensureListExists();
                 Futures.addCallback(ensureFuture, new FutureCallback<Void>() {
                     @Override
-                    public void onSuccess(@Nullable Void result) {
+                    public void onSuccess(@Nullable final Void result) {
                         LOG.debug("Initial list write successful.");
                         future.set(RpcResultBuilder.<Void>success().build());
                     }
 
                     @Override
-                    public void onFailure(Throwable throwable) {
+                    public void onFailure(final Throwable throwable) {
                         LOG.warn("Shard[{}] creation failed:", identifier, throwable);
 
                         final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
@@ -189,7 +188,7 @@ public class PrefixShardHandler {
         final CheckedFuture<Void, TransactionCommitFailedException> future = tx.submit();
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
-            public void onSuccess(@Nullable Void result) {
+            public void onSuccess(@Nullable final Void result) {
                 try {
                     LOG.debug("Closing producer for initial list.");
                     producer.close();
@@ -199,7 +198,7 @@ public class PrefixShardHandler {
             }
 
             @Override
-            public void onFailure(Throwable throwable) {
+            public void onFailure(final Throwable throwable) {
                 //NOOP handled by the caller of this method.
             }
         });
index 2a1f5ae47d24c986d1f9e04844b86f13f4a623f0..b1348c2fc42c2fc03f6be77bb9aafe9de864d734 100644 (file)
@@ -8,23 +8,15 @@
 
 package org.opendaylight.controller.clustering.it.provider.impl;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.SplittableRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
@@ -35,7 +27,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -47,81 +38,34 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ProduceTransactionsHandler implements Runnable {
-
+public class ProduceTransactionsHandler extends AbstractTransactionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
-    private static final int SECOND_AS_NANO = 1000000000;
-    //2^20 as in the model
-    private static final int MAX_ITEM = 1048576;
-
-    static final QName ID_INTS =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
-    public static final QName ID_INT =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
-    static final QName ID =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
-    static final QName ITEM =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
-    private static final QName NUMBER =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern());
-
-    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
-    public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
 
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private final List<ListenableFuture<Void>> futures = new ArrayList<>();
-    private final Set<Integer> usedValues = new HashSet<>();
+    private final SettableFuture<RpcResult<ProduceTransactionsOutput>> future = SettableFuture.create();
     private final SplittableRandom random = new SplittableRandom();
+    private final Set<Integer> usedValues = new HashSet<>();
+    private final DOMDataTreeIdentifier idListItem;
+    private final DOMDataTreeProducer itemProducer;
 
-    private final DOMDataTreeService domDataTreeService;
-    private final long runtimeNanos;
-    private final long delayNanos;
-    private final String id;
-
-    private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
-    private Stopwatch stopwatch;
-
-    private long allTx = 0;
     private long insertTx = 0;
     private long deleteTx = 0;
-    private ScheduledFuture<?> scheduledFuture;
-    private DOMDataTreeProducer itemProducer;
-    private DOMDataTreeIdentifier idListItem;
-
-    public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
-                                      final ProduceTransactionsInput input) {
-
-        this.domDataTreeService = domDataTreeService;
 
-        runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
-        delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
-        id = input.getId();
-    }
-
-    @Override
-    public void run() {
-        futures.add(execWrite(futures.size()));
-        maybeFinish();
+    private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem,
+            final ProduceTransactionsInput input) {
+        super(input);
+        this.itemProducer = Preconditions.checkNotNull(producer);
+        this.idListItem = Preconditions.checkNotNull(idListItem);
     }
 
-    public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
-        completionFuture = settableFuture;
-
-        if (fillInitialList(completionFuture)) {
-            stopwatch = Stopwatch.createStarted();
-            scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
-        } else {
-            executor.shutdown();
-        }
-    }
-
-    private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
-        LOG.debug("Filling the item list with initial values.");
+    public static ListenableFuture<RpcResult<ProduceTransactionsOutput>> start(
+            final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) {
+        final String id = input.getId();
+        LOG.debug("Filling the item list {} with initial values.", id);
 
         final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id));
 
-        itemProducer = domDataTreeService.createProducer(
-                Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
+        final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer(
+            Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
 
         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
         final DOMDataTreeWriteCursor cursor =
@@ -131,33 +75,34 @@ public class ProduceTransactionsHandler implements Runnable {
         cursor.write(list.getIdentifier(), list);
         cursor.close();
 
-        idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-            idListWithKey.node(list.getIdentifier()).toOptimized());
-
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
-            return true;
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final Exception e) {
             LOG.warn("Unable to fill the initial item list.", e);
-            settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-        }
 
-        try {
-            itemProducer.close();
-        } catch (final DOMDataTreeProducerException exception) {
-            LOG.warn("Failure while closing producer.", exception);
+            try {
+                itemProducer.close();
+            } catch (final DOMDataTreeProducerException exception) {
+                LOG.warn("Failure while closing producer.", exception);
+            }
+
+            return Futures.immediateFuture(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
         }
-        return false;
+
+        final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer,
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier())
+                .toOptimized()), input);
+        handler.doStart();
+        return handler.future;
     }
 
-    private ListenableFuture<Void> execWrite(final int offset) {
+    @Override
+    ListenableFuture<Void> execWrite(final long txId) {
         final int i = random.nextInt(MAX_ITEM + 1);
         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
         final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem);
 
-        allTx++;
-
         final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i);
         if (usedValues.contains(i)) {
             LOG.debug("Deleting item: {}", i);
@@ -177,80 +122,30 @@ public class ProduceTransactionsHandler implements Runnable {
 
         cursor.close();
 
-        final ListenableFuture<Void> future = tx.submit();
-        if (LOG.isDebugEnabled()) {
-            Futures.addCallback(future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void result) {
-                    LOG.debug("Future #{} completed successfully", offset);
-                }
-
-                @Override
-                public void onFailure(final Throwable cause) {
-                    LOG.debug("Future #{} failed", offset, cause);
-                }
-            });
-        }
-
-        return future;
+        return tx.submit();
     }
 
-    private void maybeFinish() {
-        final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
-        if (elapsed >= runtimeNanos) {
-            LOG.debug("Reached max running time, waiting for futures to complete.");
-            scheduledFuture.cancel(false);
-
-            final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
-            try {
-                // Timeout from cds should be 2 minutes so leave some leeway.
-                allFutures.get(125, TimeUnit.SECONDS);
-
-                LOG.debug("All futures completed successfully.");
-
-                final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
-                        .setAllTx(allTx)
-                        .setInsertTx(insertTx)
-                        .setDeleteTx(deleteTx)
-                        .build();
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
-                        .withResult(output).build());
-            } catch (ExecutionException e) {
-                LOG.error("Write transactions failed.", e.getCause());
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
-            } catch (InterruptedException | TimeoutException e) {
-                LOG.error("Write transactions failed.", e);
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION,
-                                "Final submit was timed out by the test provider or was interrupted", e).build());
-
-                for (int i = 0; i < futures.size(); i++) {
-                    final ListenableFuture<Void> future = futures.get(i);
+    @Override
+    void runFailed(final Throwable cause) {
+        future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
+    }
 
-                    try {
-                        future.get(0, TimeUnit.NANOSECONDS);
-                    } catch (TimeoutException fe) {
-                        LOG.warn("Future #{}/{} not completed yet", i, futures.size());
-                    } catch (ExecutionException fe) {
-                        LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
-                    } catch (InterruptedException fe) {
-                        LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
-                    }
-                }
-            } catch (Exception e) {
-                LOG.error("Write transactions failed.", e);
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            }
+    @Override
+    void runSuccessful(final long allTx) {
+        final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
+                .setAllTx(allTx)
+                .setInsertTx(insertTx)
+                .setDeleteTx(deleteTx)
+                .build();
+        future.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
+                .withResult(output).build());
+    }
 
-            executor.shutdown();
-            try {
-                itemProducer.close();
-            } catch (final DOMDataTreeProducerException e) {
-                LOG.warn("Failure while closing item producer.", e);
-            }
-        }
+    @Override
+    void runTimedOut(final Exception cause) {
+        future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION,
+                    "Final submit was timed out by the test provider or was interrupted", cause).build());
     }
 }
index b4bc304a621eae488ad32ec610b70d6299c42c1d..92b741d929733317d1b7d02ca09b326b3e5c0ef8 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
index a026e6f2f3b00f8c536d099616acbff0962df356..797e252a914a6cdcd0a3811e1dcae9a4c12a2c6b 100644 (file)
@@ -8,22 +8,14 @@
 
 package org.opendaylight.controller.clustering.it.provider.impl;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.SplittableRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
@@ -38,7 +30,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -53,155 +44,163 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class WriteTransactionsHandler implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
-    private static final int SECOND_AS_NANO = 1000000000;
-    //2^20 as in the model
-    private static final int MAX_ITEM = 1048576;
-
-    private static final QName ID_INTS =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
-    private static final QName ID_INT =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
-    private static final QName ID =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
-    private static final QName ITEM =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
-    private static final QName NUMBER =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
-
-    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
-    public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
-
-    private final WriteTransactionsInput input;
-
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private final List<ListenableFuture<Void>> futures = new ArrayList<>();
-    private final Set<Integer> usedValues = new HashSet<>();
-
-    private RandomnessProvider random;
-    private TxProvider txProvider;
+public abstract class WriteTransactionsHandler extends AbstractTransactionHandler {
+    private static final class Chained extends WriteTransactionsHandler implements TransactionChainListener {
+        private final SplittableRandom random = new SplittableRandom();
+        private final DOMTransactionChain transactionChain;
 
-    private final DOMDataBroker domDataBroker;
-    private final Long runtimeNanos;
-    private final Long delayNanos;
-    private final String id;
+        Chained(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem,
+            final WriteTransactionsInput input) {
+            super(idListItem, input);
+            transactionChain = dataBroker.createTransactionChain(this);
+        }
 
-    private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
-    private Stopwatch stopwatch;
+        @Override
+        DOMDataWriteTransaction createTransaction() {
+            return transactionChain.newWriteOnlyTransaction();
+        }
 
-    private long allTx = 0;
-    private long insertTx = 0;
-    private long deleteTx = 0;
-    private ScheduledFuture<?> scheduledFuture;
-    private YangInstanceIdentifier idListItem;
+        @Override
+        int nextInt(final int bound) {
+            return random.nextInt(bound);
+        }
 
-    public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
-        this.domDataBroker = domDataBroker;
-        this.input = input;
+        @Override
+        public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+                final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+            LOG.warn("Transaction chain failed.", cause);
+            completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
+        }
 
-        runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
-        delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
-        id = input.getId();
+        @Override
+        public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+            LOG.debug("Transaction chain closed successfully.");
+        }
     }
 
-    @Override
-    public void run() {
-        futures.add(execWrite(futures.size()));
-        maybeFinish();
-    }
+    private static final class Simple extends WriteTransactionsHandler {
+        private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
+        private final SplittableRandom random = new SplittableRandom();
+        private final DOMDataBroker dataBroker;
 
-    public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
-        LOG.debug("Starting write-transactions.");
+        Simple(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem,
+            final WriteTransactionsInput input) {
+            super(idListItem, input);
+            this.dataBroker = Preconditions.checkNotNull(dataBroker);
+        }
 
-        if (input.isChainedTransactions()) {
-            txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
-            random = new BasicProvider();
-        } else {
-            txProvider = new DataBrokerBackedProvider(domDataBroker);
-            random = new NonConflictingProvider();
+        @Override
+        DOMDataWriteTransaction createTransaction() {
+            return dataBroker.newWriteOnlyTransaction();
         }
 
-        if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
-            stopwatch = Stopwatch.createStarted();
-            completionFuture = settableFuture;
-            scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
-        } else {
-            executor.shutdown();
+        @Override
+        int nextInt(final int bound) {
+            int nextInt;
+            do {
+                nextInt = random.nextInt(bound);
+            } while (previousNumbers.contains(nextInt));
+
+            if (previousNumbers.size() > 100000) {
+                previousNumbers.iterator().remove();
+            }
+            previousNumbers.add(nextInt);
+
+            return nextInt;
         }
     }
 
-    private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
+    private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
+
+    final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture = SettableFuture.create();
+    private final Set<Integer> usedValues = new HashSet<>();
+    private final YangInstanceIdentifier idListItem;
+
+    private long insertTx = 0;
+    private long deleteTx = 0;
+
+    WriteTransactionsHandler(final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) {
+        super(input);
+        this.idListItem = Preconditions.checkNotNull(idListItem);
+    }
+
+    public static ListenableFuture<RpcResult<WriteTransactionsOutput>> start(final DOMDataBroker domDataBroker,
+            final WriteTransactionsInput input) {
+        LOG.debug("Starting write-transactions.");
+
+        final String id = input.getId();
+        final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
+                .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
+                .build();
+        final YangInstanceIdentifier idListItem = ID_INT_YID.node(entry.getIdentifier());
 
         final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
                 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
                 .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
                 .build();
 
-        DOMDataWriteTransaction tx = txProvider.createTransaction();
+        DOMDataWriteTransaction tx = domDataBroker.newWriteOnlyTransaction();
         // write only the top list
         tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final OptimisticLockFailedException e) {
             // when multiple write-transactions are executed concurrently we need to ignore this.
             // If we get optimistic lock here it means id-ints already exists and we can continue.
             LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
         } catch (final TransactionCommitFailedException | TimeoutException e) {
             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
-            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            return false;
         }
 
-        final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
-                .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
-                .build();
-
-        idListItem = ID_INT_YID.node(entry.getIdentifier());
-        tx = txProvider.createTransaction();
+        tx = domDataBroker.newWriteOnlyTransaction();
         tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry);
 
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
-            return true;
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final Exception e) {
             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
-            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            return false;
         }
-    }
 
-    private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
         LOG.debug("Filling the item list with initial values.");
 
         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
 
         final YangInstanceIdentifier itemListId = idListItem.node(ITEM);
-        final DOMDataWriteTransaction tx = txProvider.createTransaction();
+        tx = domDataBroker.newWriteOnlyTransaction();
         tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
 
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
-            return true;
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final Exception e) {
             LOG.warn("Unable to fill the initial item list.", e);
-            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            return false;
         }
+
+        final WriteTransactionsHandler handler;
+        if (input.isChainedTransactions()) {
+            handler = new Chained(domDataBroker, idListItem, input);
+        } else {
+            handler = new Simple(domDataBroker, idListItem, input);
+        }
+
+        handler.doStart();
+        return handler.completionFuture;
     }
 
-    private ListenableFuture<Void> execWrite(final int offset) {
-        final int i = random.nextInt(MAX_ITEM + 1);
+    @Override
+    ListenableFuture<Void> execWrite(final long txId) {
+        final int i = nextInt(MAX_ITEM + 1);
 
         final YangInstanceIdentifier entryId =
                 idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
 
-        final DOMDataWriteTransaction tx = txProvider.createTransaction();
-        allTx++;
+        final DOMDataWriteTransaction tx = createTransaction();
 
         if (usedValues.contains(i)) {
             LOG.debug("Deleting item: {}", i);
@@ -217,181 +216,35 @@ public class WriteTransactionsHandler implements Runnable {
             usedValues.add(i);
         }
 
-        final ListenableFuture<Void> future = tx.submit();
-        if (LOG.isDebugEnabled()) {
-            Futures.addCallback(future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void result) {
-                    LOG.debug("Future #{} completed successfully", offset);
-                }
-
-                @Override
-                public void onFailure(final Throwable cause) {
-                    LOG.debug("Future #{} failed", offset, cause);
-                }
-            });
-        }
-
-        return future;
-    }
-
-    private void maybeFinish() {
-        final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
-        if (elapsed >= runtimeNanos) {
-            LOG.debug("Reached max running time, waiting for futures to complete.");
-            scheduledFuture.cancel(false);
-
-            final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
-            try {
-                // Timeout from cds should be 2 minutes so leave some leeway.
-                allFutures.get(125, TimeUnit.SECONDS);
-
-                LOG.debug("All futures completed successfully.");
-
-                final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
-                        .setAllTx(allTx)
-                        .setInsertTx(insertTx)
-                        .setDeleteTx(deleteTx)
-                        .build();
-
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
-                        .withResult(output).build());
-
-                executor.shutdown();
-            } catch (final ExecutionException e) {
-                LOG.error("Write transactions failed.", e.getCause());
-
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
-            } catch (InterruptedException | TimeoutException e) {
-                LOG.error("Write transactions failed.", e);
-
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION,
-                                "Final submit was timed out by the test provider or was interrupted", e).build());
-
-                for (int i = 0; i < futures.size(); i++) {
-                    final ListenableFuture<Void> future = futures.get(i);
-
-                    try {
-                        future.get(0, TimeUnit.NANOSECONDS);
-                    } catch (final TimeoutException fe) {
-                        LOG.warn("Future #{}/{} not completed yet", i, futures.size());
-                    } catch (final ExecutionException fe) {
-                        LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
-                    } catch (final InterruptedException fe) {
-                        LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
-                    }
-                }
-            } catch (Exception exception) {
-                LOG.error("Write transactions failed.", exception);
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
-
-                executor.shutdown();
-            }
-        }
-    }
-
-    private interface RandomnessProvider {
-        int nextInt(int bound);
-    }
-
-    private static class NonConflictingProvider implements RandomnessProvider {
-
-        private final SplittableRandom random = new SplittableRandom();
-        private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
-
-        @Override
-        public int nextInt(int bound) {
-            int nextInt;
-            do {
-                nextInt = random.nextInt(bound);
-            } while (previousNumbers.contains(nextInt));
-
-            if (previousNumbers.size() > 100000) {
-                previousNumbers.iterator().remove();
-            }
-            previousNumbers.add(nextInt);
-
-            return nextInt;
-        }
+        return tx.submit();
     }
 
-    private static class BasicProvider implements RandomnessProvider {
-
-        private final SplittableRandom random = new SplittableRandom();
-
-        @Override
-        public int nextInt(int bound) {
-            return random.nextInt(bound);
-        }
-    }
-
-    private interface TxProvider {
-
-        DOMDataWriteTransaction createTransaction();
+    @Override
+    void runFailed(final Throwable cause) {
+        completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
     }
 
-    private static class TxChainBackedProvider implements TxProvider {
-
-        private final DOMTransactionChain transactionChain;
-
-        TxChainBackedProvider(final DOMDataBroker dataBroker,
-                              final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
-                              final ScheduledExecutorService executor) {
-
-            transactionChain =
-                    dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
-        }
+    @Override
+    void runSuccessful(final long allTx) {
+        final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
+                .setAllTx(allTx)
+                .setInsertTx(insertTx)
+                .setDeleteTx(deleteTx)
+                .build();
 
-        @Override
-        public DOMDataWriteTransaction createTransaction() {
-            return transactionChain.newWriteOnlyTransaction();
-        }
+        completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
+                .withResult(output).build());
     }
 
-    private static class DataBrokerBackedProvider implements TxProvider {
-
-        private final DOMDataBroker dataBroker;
-
-        DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
-            this.dataBroker = dataBroker;
-        }
-
-        @Override
-        public DOMDataWriteTransaction createTransaction() {
-            return dataBroker.newWriteOnlyTransaction();
-        }
+    @Override
+    void runTimedOut(final Exception cause) {
+        completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION,
+                    "Final submit was timed out by the test provider or was interrupted", cause).build());
     }
 
-    private static class TestChainListener implements TransactionChainListener {
-
-        private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
-        private final ScheduledExecutorService executor;
-
-        TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
-                          final ScheduledExecutorService executor) {
-
-            this.resultFuture = resultFuture;
-            this.executor = executor;
-        }
-
-        @Override
-        public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
-                                             final AsyncTransaction<?, ?> transaction,
-                                             final Throwable cause) {
-            LOG.warn("Transaction chain failed.", cause);
-            resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
-
-            executor.shutdown();
-        }
+    abstract DOMDataWriteTransaction createTransaction();
 
-        @Override
-        public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-            LOG.debug("Transaction chain closed successfully.");
-        }
-    }
+    abstract int nextInt(int bound);
 }
index 38e0596a07a30028b38877283638d9f678482815..c6744a9393faf96a0a8e643e7258b03516ee3c59 100644 (file)
@@ -23,10 +23,10 @@ public class YnlListener implements OdlMdsalLowlevelTargetListener {
 
     private final String id;
 
-    private AtomicLong localNumber = new AtomicLong();
-    private AtomicLong allNot = new AtomicLong();
-    private AtomicLong idNot = new AtomicLong();
-    private AtomicLong errNot = new AtomicLong();
+    private final AtomicLong localNumber = new AtomicLong();
+    private final AtomicLong allNot = new AtomicLong();
+    private final AtomicLong idNot = new AtomicLong();
+    private final AtomicLong errNot = new AtomicLong();
 
     public YnlListener(final String id) {
         Preconditions.checkNotNull(id);
@@ -42,14 +42,13 @@ public class YnlListener implements OdlMdsalLowlevelTargetListener {
         if (notification.getId().equals(id)) {
             idNot.incrementAndGet();
 
-            localNumber.getAndUpdate((value -> {
+            localNumber.getAndUpdate(value -> {
                 if (notification.getSequenceNumber() - value == 1) {
                     return value + 1;
-                } else {
-                    errNot.getAndIncrement();
-                    return value;
                 }
-            }));
+                errNot.getAndIncrement();
+                return value;
+            });
         }
     }