BUG-8494: Cleanup clustering-it-provider 46/59846/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 23 Jun 2017 11:02:40 +0000 (13:02 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 3 Jul 2017 11:08:18 +0000 (11:08 +0000)
Fixes various warnings and refactors MdsalLowLevelTestProvider
to be slightly cleaner in terms of number of classes.

It also eliminates synchronous thread blocking on future collection
and instead schedules task which performs the cleanup if the system
gets stuck.

Change-Id: I657f3df60c620284538bdf39ab1536eac8448801
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit d97061af6814ad7b085af10797a252aa4aa5cda6)

13 files changed:
opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataChangeListener.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataTreeChangeListener.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarEntryDataTreeCommitCohort.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/PeopleProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java [new file with mode: 0644]
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/RoutedGetConstantService.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/YnlListener.java

index 89d220b..aa2f2da 100644 (file)
@@ -227,6 +227,37 @@ module odl-mdsal-lowlevel-control {
         }
     }
 
+    grouping transactions-params {
+        leaf seconds {
+            description "This RPC has to work (roughly) this long.";
+            mandatory true;
+            type uint32;
+        }
+        leaf transactions-per-second {
+            description "An upper limit of transactions per second this RPC shall try to achieve.";
+            mandatory true;
+            type uint32;
+        }
+    }
+
+    grouping transactions-result {
+        leaf all-tx {
+            description "Number of all transactions executed.";
+            type int64;
+            mandatory true;
+        }
+        leaf insert-tx {
+            description "Number of transactions that inserted data.";
+            type int64;
+            mandatory true;
+        }
+        leaf delete-tx {
+            description "Number of transactions that deleted data.";
+            type int64;
+            mandatory true;
+        }
+    }
+
     rpc write-transactions {
         description "Upon receiving this, the member shall make sure the outer list item
             of llt:id-ints exists for the given id, and then start creating (one by one)
@@ -245,16 +276,7 @@ module odl-mdsal-lowlevel-control {
             OptimisticLockException is always considered an error.";
         input {
             uses llc:id-grouping;
-            leaf seconds {
-                description "This RPC has to work (roughly) this long.";
-                mandatory true;
-                type uint32;
-            }
-            leaf transactions-per-second {
-                description "An upper limit of transactions per second this RPC shall try to achieve.";
-                mandatory true;
-                type uint32;
-            }
+            uses transactions-params;
             leaf chained-transactions {
                 description "If true, write transactions shall be created on a transaction chain,
                     (created at start of the RPC call, and deleted at at its end).
@@ -264,21 +286,7 @@ module odl-mdsal-lowlevel-control {
             }
         }
         output {
-            leaf all-tx {
-                description "Number of all transactions executed.";
-                type int64;
-                mandatory true;
-            }
-            leaf insert-tx {
-                description "Number of transactions that inserted data.";
-                type int64;
-                mandatory true;
-            }
-            leaf delete-tx {
-                description "Number of transactions that deleted data.";
-                type int64;
-                mandatory true;
-            }
+            uses transactions-result;
         }
     }
 
@@ -305,16 +313,7 @@ module odl-mdsal-lowlevel-control {
             but the shard and the whole id item shall be kept as they are.";
         input {
             uses llc:id-grouping;
-            leaf seconds {
-                description "This RPC has to work (roughly) this long.";
-                mandatory true;
-                type uint32;
-            }
-            leaf transactions-per-second {
-                description "An upper limit of transactions per second this RPC shall try to achieve.";
-                mandatory true;
-                type uint32;
-            }
+            uses transactions-params;
             leaf isolated-transactions {
                 description "The value for DOMDataTreeProducer#createTransaction argument.";
                 mandatory true;
@@ -322,21 +321,7 @@ module odl-mdsal-lowlevel-control {
             }
         }
         output {
-            leaf all-tx {
-                description "Number of all transactions executed.";
-                type int64;
-                mandatory true;
-            }
-            leaf insert-tx {
-                description "Number of transactions that inserted data.";
-                type int64;
-                mandatory true;
-            }
-            leaf delete-tx {
-                description "Number of transactions that deleted data.";
-                type int64;
-                mandatory true;
-            }
+            uses transactions-result;
         }
     }
 
index ac5c368..22def4c 100644 (file)
@@ -27,14 +27,14 @@ public class CarDataChangeListener implements DataChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(CarDataChangeListener.class);
 
     @Override
-    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("onDataChanged invoked");
             outputChanges(change);
         }
     }
 
-    private void outputChanges(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+    private static void outputChanges(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
         final Map<InstanceIdentifier<?>, DataObject> originalData = change.getOriginalData() != null ?
                 change.getOriginalData(): Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
         final Map<InstanceIdentifier<?>, DataObject> updatedData = change.getUpdatedData() != null ?
index c157b6c..a10e22f 100644 (file)
@@ -8,13 +8,9 @@
 
 package org.opendaylight.controller.clustering.it.provider;
 
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE;
-
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
@@ -32,7 +28,7 @@ public class CarDataTreeChangeListener implements DataTreeChangeListener<Cars> {
     private static final Logger LOG = LoggerFactory.getLogger(CarDataTreeChangeListener.class);
 
     @java.lang.Override
-    public void onDataTreeChanged(@Nonnull java.util.Collection<DataTreeModification<Cars>> changes) {
+    public void onDataTreeChanged(@Nonnull final java.util.Collection<DataTreeModification<Cars>> changes) {
         if (LOG.isTraceEnabled()) {
             for (DataTreeModification<Cars> change : changes) {
                 ouputChanges(change);
@@ -40,7 +36,7 @@ public class CarDataTreeChangeListener implements DataTreeChangeListener<Cars> {
         }
     }
 
-    private void ouputChanges(final DataTreeModification<Cars> change) {
+    private static void ouputChanges(final DataTreeModification<Cars> change) {
         final DataObjectModification<Cars> rootNode = change.getRootNode();
         final ModificationType modificationType = rootNode.getModificationType();
         final InstanceIdentifier<Cars> rootIdentifier = change.getRootPath().getRootIdentifier();
index 1bc6581..2700159 100644 (file)
@@ -39,10 +39,9 @@ public class CarEntryDataTreeCommitCohort implements DOMDataTreeCommitCohort {
     private static final QName YEAR_QNAME = QName.create(Cars.QNAME, "year").intern();
     private static final NodeIdentifier YEAR_NODE_ID = new NodeIdentifier(YEAR_QNAME);
 
-    @SuppressWarnings("unchecked")
     @Override
-    public CheckedFuture<PostCanCommitStep, DataValidationFailedException> canCommit(Object txId,
-            DOMDataTreeCandidate candidate, SchemaContext ctx) {
+    public CheckedFuture<PostCanCommitStep, DataValidationFailedException> canCommit(final Object txId,
+            final DOMDataTreeCandidate candidate, final SchemaContext ctx) {
 
         // Simple data validation - verify the year, if present, is >= 1990
 
@@ -77,6 +76,6 @@ public class CarEntryDataTreeCommitCohort implements DOMDataTreeCommitCohort {
 
         // Return the noop PostCanCommitStep as we're only validating input data and not participating in the
         // remaining 3PC stages (pre-commit and commit).
-        return (CheckedFuture<PostCanCommitStep, DataValidationFailedException>) PostCanCommitStep.NOOP_SUCCESS_FUTURE;
+        return PostCanCommitStep.NOOP_SUCCESS_FUTURE;
     }
 }
index ec4c1bb..ea9f7b1 100644 (file)
@@ -20,7 +20,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
@@ -63,7 +62,7 @@ public class CarProvider implements CarService {
     private static final Logger LOG_CAR_PROVIDER = LoggerFactory.getLogger(CarProvider.class);
 
     private static final String ENTITY_TYPE = "cars";
-    private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build();
+    private static final InstanceIdentifier<Cars> CARS_IID = InstanceIdentifier.builder(Cars.class).build();
     private static final DataTreeIdentifier<Cars> CARS_DTID = new DataTreeIdentifier<>(
             LogicalDatastoreType.CONFIGURATION, CARS_IID);
 
@@ -76,7 +75,7 @@ public class CarProvider implements CarService {
     private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener();
     private final AtomicBoolean registeredListener = new AtomicBoolean();
 
-    private final Collection<ListenerRegistration<DataChangeListener>> carsDclRegistrations =
+    private final Collection<ListenerRegistration<?>> carsDclRegistrations =
             Sets.newConcurrentHashSet();
     private final Collection<ListenerRegistration<CarDataTreeChangeListener>> carsDtclRegistrations =
             Sets.newConcurrentHashSet();
@@ -86,8 +85,8 @@ public class CarProvider implements CarService {
     private final AtomicReference<DOMDataTreeCommitCohortRegistration<CarEntryDataTreeCommitCohort>> commitCohortReg =
             new AtomicReference<>();
 
-    public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService,
-            DOMDataBroker domDataBroker) {
+    public CarProvider(final DataBroker dataProvider, final EntityOwnershipService ownershipService,
+            final DOMDataBroker domDataBroker) {
         this.dataProvider = dataProvider;
         this.ownershipService = ownershipService;
         this.domDataBroker = domDataBroker;
@@ -110,7 +109,7 @@ public class CarProvider implements CarService {
     }
 
     @Override
-    public Future<RpcResult<Void>> stressTest(StressTestInput input) {
+    public Future<RpcResult<Void>> stressTest(final StressTestInput input) {
         final int inputRate;
         final long inputCount;
 
@@ -120,10 +119,9 @@ public class CarProvider implements CarService {
             return Futures.immediateFuture(RpcResultBuilder.<Void>failed()
                     .withError(ErrorType.PROTOCOL, "invalid rate")
                     .build());
-        } else {
-            inputRate = input.getRate();
         }
 
+        inputRate = input.getRate();
         if (input.getCount() != null) {
             inputCount = input.getCount();
         } else {
@@ -218,7 +216,7 @@ public class CarProvider implements CarService {
 
 
     @Override
-    public Future<RpcResult<Void>> registerOwnership(RegisterOwnershipInput input) {
+    public Future<RpcResult<Void>> registerOwnership(final RegisterOwnershipInput input) {
         if(registeredListener.compareAndSet(false, true)) {
             ownershipService.registerListener(ENTITY_TYPE, ownershipListener);
         }
@@ -235,13 +233,13 @@ public class CarProvider implements CarService {
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterOwnership(UnregisterOwnershipInput input) {
+    public Future<RpcResult<Void>> unregisterOwnership(final UnregisterOwnershipInput input) {
         return RpcResultBuilder.<Void>success().buildFuture();
     }
 
     private static class CarEntityOwnershipListener implements EntityOwnershipListener {
         @Override
-        public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+        public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
             LOG_CAR_PROVIDER.info("ownershipChanged: {}", ownershipChange);
         }
     }
@@ -278,7 +276,7 @@ public class CarProvider implements CarService {
         LOG_CAR_PROVIDER.info("Unregistering the CarDataChangeListener(s)");
         synchronized (carsDclRegistrations) {
             int numListeners = 0;
-            for (ListenerRegistration<DataChangeListener> carsDclRegistration : carsDclRegistrations) {
+            for (ListenerRegistration<?> carsDclRegistration : carsDclRegistrations) {
                 carsDclRegistration.close();
                 numListeners++;
             }
index a93d99f..e0e8d99 100644 (file)
@@ -137,17 +137,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final DOMDataTreeChangeService domDataTreeChangeService;
     private final ActorSystem actorSystem;
 
-    private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
+    private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
             new HashMap<>();
 
-    private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+    private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
 
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
     private IdIntsListener idIntsListener;
-    private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+    private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
     private IdIntsDOMDataTreeLIstener idIntsDdtl;
 
@@ -252,13 +252,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     @Override
     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
         LOG.debug("write-transactions, input: {}", input);
-
-        final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
-        final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
-        writeTransactionsHandler.start(settableFuture);
-
-        return settableFuture;
+        return WriteTransactionsHandler.start(domDataBroker, input);
     }
 
     @Override
@@ -267,7 +261,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+    public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
         return null;
     }
 
@@ -339,7 +333,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+    public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
         return null;
     }
 
@@ -381,7 +375,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+    public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         return null;
     }
 
@@ -573,14 +567,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     @Override
     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
         LOG.debug("producer-transactions, input: {}", input);
-
-        final ProduceTransactionsHandler handler =
-                new ProduceTransactionsHandler(domDataTreeService, input);
-
-        final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
-        handler.start(settableFuture);
-
-        return settableFuture;
+        return ProduceTransactionsHandler.start(domDataTreeService, input);
     }
 
     @Override
@@ -601,7 +588,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
 
-        final InstanceIdentifier shardPrefix = input.getPrefix();
+        final InstanceIdentifier<?> shardPrefix = input.getPrefix();
 
         if (shardPrefix == null) {
             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
index 48385b4..0982283 100644 (file)
@@ -43,12 +43,12 @@ public class PeopleProvider implements PeopleService, AutoCloseable {
   }
 
 
-  public void setRpcRegistration(BindingAwareBroker.RoutedRpcRegistration<CarPurchaseService> rpcRegistration) {
+  public void setRpcRegistration(final BindingAwareBroker.RoutedRpcRegistration<CarPurchaseService> rpcRegistration) {
     this.rpcRegistration = rpcRegistration;
   }
 
   @Override
-  public Future<RpcResult<Void>> addPerson(AddPersonInput input) {
+  public Future<RpcResult<Void>> addPerson(final AddPersonInput input) {
     LOG.info("RPC addPerson : adding person [{}]", input);
 
     PersonBuilder builder = new PersonBuilder(input);
@@ -59,7 +59,7 @@ public class PeopleProvider implements PeopleService, AutoCloseable {
     final InstanceIdentifier.InstanceIdentifierBuilder<Person> personIdBuilder =
         InstanceIdentifier.<People>builder(People.class)
             .child(Person.class, person.getKey());
-    final InstanceIdentifier personId = personIdBuilder.build();
+    final InstanceIdentifier<Person> personId = personIdBuilder.build();
     // Place entry in data store tree
     WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
     tx.put(LogicalDatastoreType.CONFIGURATION, personId, person, true);
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
new file mode 100644 (file)
index 0000000..f23e7ec
--- /dev/null
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractTransactionHandler {
+    private abstract static class Phase {
+        abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
+
+        abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
+    }
+
+    private static final class Running extends Phase {
+        private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
+        private Throwable failure;
+
+        void addFuture(final ListenableFuture<Void> execFuture) {
+            futures.add(execFuture);
+        }
+
+        @Override
+        void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+            futures.remove(execFuture);
+        }
+
+        @Override
+        void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+            futures.remove(execFuture);
+            if (failure != null) {
+                failure = cause;
+            }
+        }
+
+        Optional<Throwable> getFailure() {
+            return Optional.ofNullable(failure);
+        }
+    }
+
+    private final class Collecting extends Phase {
+        private final List<ListenableFuture<Void>> futures;
+        private boolean done;
+
+        Collecting(final Collection<ListenableFuture<Void>> futures) {
+            this.futures = new ArrayList<>(futures);
+        }
+
+        @Override
+        void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+            futures.remove(execFuture);
+            if (futures.isEmpty() && !done) {
+                LOG.debug("All futures completed successfully.");
+                runSuccessful(txCounter);
+            }
+        }
+
+        @Override
+        void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+            futures.remove(execFuture);
+            done = true;
+            runFailed(cause);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
+
+    static final int SECOND_AS_NANO = 1000000000;
+    //2^20 as in the model
+    static final int MAX_ITEM = 1048576;
+
+    static final QName ID_INTS =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
+    static final QName ID =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
+    static final QName ITEM =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
+    static final QName NUMBER =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
+
+    public static final QName ID_INT =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
+    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
+    public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
+
+    static final long INIT_TX_TIMEOUT_SECONDS = 125;
+
+    private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
+
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    private final Stopwatch stopwatch = Stopwatch.createStarted();
+    private final long runtimeNanos;
+    private final long delayNanos;
+
+    private ScheduledFuture<?> scheduledFuture;
+    private long txCounter;
+    @GuardedBy("this")
+    private Phase phase;
+
+    AbstractTransactionHandler(final TransactionsParams params) {
+        runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
+        delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
+    }
+
+    final synchronized void doStart() {
+        phase = new Running();
+        scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+    }
+
+    private void execute() {
+        final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+        if (elapsed < runtimeNanos) {
+            // Not completed yet: create a transaction and hook it up
+            final long txId = txCounter++;
+            final ListenableFuture<Void> execFuture = execWrite(txId);
+
+            // Ordering is important: we need to add the future before hooking the callback
+            synchronized (this) {
+                ((Running) phase).addFuture(execFuture);
+            }
+            Futures.addCallback(execFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void result) {
+                    txSuccess(execFuture, txId);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    txFailure(execFuture, txId, cause);
+                }
+            });
+        } else {
+            startCollection();
+        }
+    }
+
+    private synchronized void startCollection() {
+        scheduledFuture.cancel(false);
+
+        final Running running = (Running) phase;
+        final Optional<Throwable> failure = running.getFailure();
+        if (failure.isPresent()) {
+            executor.shutdown();
+            runFailed(failure.get());
+            return;
+        }
+
+        LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size());
+        if (running.futures.isEmpty()) {
+            executor.shutdown();
+            runSuccessful(txCounter);
+            return;
+        }
+
+        phase = new Collecting(running.futures);
+        executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        executor.shutdown();
+    }
+
+    final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+        LOG.debug("Future #{} completed successfully", txId);
+        phase.txSuccess(execFuture, txId);
+    }
+
+    final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+        LOG.debug("Future #{} failed", txId, cause);
+        phase.txFailure(execFuture, txId, cause);
+    }
+
+    private synchronized void checkCollection() {
+        final Collecting collecting = (Collecting) phase;
+        if (!collecting.done) {
+            final int size = collecting.futures.size();
+            for (int i = 0; i < size; i++) {
+                final ListenableFuture<Void> future = collecting.futures.get(i);
+
+                try {
+                    future.get(0, TimeUnit.NANOSECONDS);
+                } catch (final TimeoutException e) {
+                    LOG.warn("Future #{}/{} not completed yet", i, size);
+                } catch (final ExecutionException e) {
+                    LOG.warn("Future #{}/{} failed", i, size, e.getCause());
+                } catch (final InterruptedException e) {
+                    LOG.warn("Interrupted while examining future #{}/{}", i, size, e);
+                }
+            }
+
+            runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
+        }
+    }
+
+    abstract ListenableFuture<Void> execWrite(final long txId);
+
+    abstract void runFailed(Throwable cause);
+
+    abstract void runSuccessful(long allTx);
+
+    abstract void runTimedOut(Exception cause);
+}
index 8f711b3..4ffb415 100644 (file)
@@ -8,10 +8,10 @@
 
 package org.opendaylight.controller.clustering.it.provider.impl;
 
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INT;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INTS;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ITEM;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INT;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INTS;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ITEM;
 
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
@@ -38,7 +38,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -94,13 +93,13 @@ public class PrefixShardHandler {
                 final CheckedFuture<Void, TransactionCommitFailedException> ensureFuture = ensureListExists();
                 Futures.addCallback(ensureFuture, new FutureCallback<Void>() {
                     @Override
-                    public void onSuccess(@Nullable Void result) {
+                    public void onSuccess(@Nullable final Void result) {
                         LOG.debug("Initial list write successful.");
                         future.set(RpcResultBuilder.<Void>success().build());
                     }
 
                     @Override
-                    public void onFailure(Throwable throwable) {
+                    public void onFailure(final Throwable throwable) {
                         LOG.warn("Shard[{}] creation failed:", identifier, throwable);
 
                         final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
@@ -189,7 +188,7 @@ public class PrefixShardHandler {
         final CheckedFuture<Void, TransactionCommitFailedException> future = tx.submit();
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
-            public void onSuccess(@Nullable Void result) {
+            public void onSuccess(@Nullable final Void result) {
                 try {
                     LOG.debug("Closing producer for initial list.");
                     producer.close();
@@ -199,7 +198,7 @@ public class PrefixShardHandler {
             }
 
             @Override
-            public void onFailure(Throwable throwable) {
+            public void onFailure(final Throwable throwable) {
                 //NOOP handled by the caller of this method.
             }
         });
index 2a1f5ae..b1348c2 100644 (file)
@@ -8,23 +8,15 @@
 
 package org.opendaylight.controller.clustering.it.provider.impl;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.SplittableRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
@@ -35,7 +27,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -47,81 +38,34 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ProduceTransactionsHandler implements Runnable {
-
+public class ProduceTransactionsHandler extends AbstractTransactionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
-    private static final int SECOND_AS_NANO = 1000000000;
-    //2^20 as in the model
-    private static final int MAX_ITEM = 1048576;
-
-    static final QName ID_INTS =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
-    public static final QName ID_INT =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
-    static final QName ID =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
-    static final QName ITEM =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
-    private static final QName NUMBER =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern());
-
-    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
-    public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
 
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private final List<ListenableFuture<Void>> futures = new ArrayList<>();
-    private final Set<Integer> usedValues = new HashSet<>();
+    private final SettableFuture<RpcResult<ProduceTransactionsOutput>> future = SettableFuture.create();
     private final SplittableRandom random = new SplittableRandom();
+    private final Set<Integer> usedValues = new HashSet<>();
+    private final DOMDataTreeIdentifier idListItem;
+    private final DOMDataTreeProducer itemProducer;
 
-    private final DOMDataTreeService domDataTreeService;
-    private final long runtimeNanos;
-    private final long delayNanos;
-    private final String id;
-
-    private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
-    private Stopwatch stopwatch;
-
-    private long allTx = 0;
     private long insertTx = 0;
     private long deleteTx = 0;
-    private ScheduledFuture<?> scheduledFuture;
-    private DOMDataTreeProducer itemProducer;
-    private DOMDataTreeIdentifier idListItem;
-
-    public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
-                                      final ProduceTransactionsInput input) {
-
-        this.domDataTreeService = domDataTreeService;
 
-        runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
-        delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
-        id = input.getId();
-    }
-
-    @Override
-    public void run() {
-        futures.add(execWrite(futures.size()));
-        maybeFinish();
+    private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem,
+            final ProduceTransactionsInput input) {
+        super(input);
+        this.itemProducer = Preconditions.checkNotNull(producer);
+        this.idListItem = Preconditions.checkNotNull(idListItem);
     }
 
-    public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
-        completionFuture = settableFuture;
-
-        if (fillInitialList(completionFuture)) {
-            stopwatch = Stopwatch.createStarted();
-            scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
-        } else {
-            executor.shutdown();
-        }
-    }
-
-    private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
-        LOG.debug("Filling the item list with initial values.");
+    public static ListenableFuture<RpcResult<ProduceTransactionsOutput>> start(
+            final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) {
+        final String id = input.getId();
+        LOG.debug("Filling the item list {} with initial values.", id);
 
         final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id));
 
-        itemProducer = domDataTreeService.createProducer(
-                Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
+        final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer(
+            Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
 
         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
         final DOMDataTreeWriteCursor cursor =
@@ -131,33 +75,34 @@ public class ProduceTransactionsHandler implements Runnable {
         cursor.write(list.getIdentifier(), list);
         cursor.close();
 
-        idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-            idListWithKey.node(list.getIdentifier()).toOptimized());
-
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
-            return true;
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final Exception e) {
             LOG.warn("Unable to fill the initial item list.", e);
-            settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-        }
 
-        try {
-            itemProducer.close();
-        } catch (final DOMDataTreeProducerException exception) {
-            LOG.warn("Failure while closing producer.", exception);
+            try {
+                itemProducer.close();
+            } catch (final DOMDataTreeProducerException exception) {
+                LOG.warn("Failure while closing producer.", exception);
+            }
+
+            return Futures.immediateFuture(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
         }
-        return false;
+
+        final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer,
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier())
+                .toOptimized()), input);
+        handler.doStart();
+        return handler.future;
     }
 
-    private ListenableFuture<Void> execWrite(final int offset) {
+    @Override
+    ListenableFuture<Void> execWrite(final long txId) {
         final int i = random.nextInt(MAX_ITEM + 1);
         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
         final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem);
 
-        allTx++;
-
         final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i);
         if (usedValues.contains(i)) {
             LOG.debug("Deleting item: {}", i);
@@ -177,80 +122,30 @@ public class ProduceTransactionsHandler implements Runnable {
 
         cursor.close();
 
-        final ListenableFuture<Void> future = tx.submit();
-        if (LOG.isDebugEnabled()) {
-            Futures.addCallback(future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void result) {
-                    LOG.debug("Future #{} completed successfully", offset);
-                }
-
-                @Override
-                public void onFailure(final Throwable cause) {
-                    LOG.debug("Future #{} failed", offset, cause);
-                }
-            });
-        }
-
-        return future;
+        return tx.submit();
     }
 
-    private void maybeFinish() {
-        final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
-        if (elapsed >= runtimeNanos) {
-            LOG.debug("Reached max running time, waiting for futures to complete.");
-            scheduledFuture.cancel(false);
-
-            final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
-            try {
-                // Timeout from cds should be 2 minutes so leave some leeway.
-                allFutures.get(125, TimeUnit.SECONDS);
-
-                LOG.debug("All futures completed successfully.");
-
-                final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
-                        .setAllTx(allTx)
-                        .setInsertTx(insertTx)
-                        .setDeleteTx(deleteTx)
-                        .build();
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
-                        .withResult(output).build());
-            } catch (ExecutionException e) {
-                LOG.error("Write transactions failed.", e.getCause());
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
-            } catch (InterruptedException | TimeoutException e) {
-                LOG.error("Write transactions failed.", e);
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION,
-                                "Final submit was timed out by the test provider or was interrupted", e).build());
-
-                for (int i = 0; i < futures.size(); i++) {
-                    final ListenableFuture<Void> future = futures.get(i);
+    @Override
+    void runFailed(final Throwable cause) {
+        future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
+    }
 
-                    try {
-                        future.get(0, TimeUnit.NANOSECONDS);
-                    } catch (TimeoutException fe) {
-                        LOG.warn("Future #{}/{} not completed yet", i, futures.size());
-                    } catch (ExecutionException fe) {
-                        LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
-                    } catch (InterruptedException fe) {
-                        LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
-                    }
-                }
-            } catch (Exception e) {
-                LOG.error("Write transactions failed.", e);
-                completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            }
+    @Override
+    void runSuccessful(final long allTx) {
+        final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
+                .setAllTx(allTx)
+                .setInsertTx(insertTx)
+                .setDeleteTx(deleteTx)
+                .build();
+        future.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
+                .withResult(output).build());
+    }
 
-            executor.shutdown();
-            try {
-                itemProducer.close();
-            } catch (final DOMDataTreeProducerException e) {
-                LOG.warn("Failure while closing item producer.", e);
-            }
-        }
+    @Override
+    void runTimedOut(final Exception cause) {
+        future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION,
+                    "Final submit was timed out by the test provider or was interrupted", cause).build());
     }
 }
index b4bc304..92b741d 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
index a026e6f..797e252 100644 (file)
@@ -8,22 +8,14 @@
 
 package org.opendaylight.controller.clustering.it.provider.impl;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.SplittableRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
@@ -38,7 +30,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -53,155 +44,163 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class WriteTransactionsHandler implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
-    private static final int SECOND_AS_NANO = 1000000000;
-    //2^20 as in the model
-    private static final int MAX_ITEM = 1048576;
-
-    private static final QName ID_INTS =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
-    private static final QName ID_INT =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
-    private static final QName ID =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
-    private static final QName ITEM =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
-    private static final QName NUMBER =
-            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
-
-    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
-    public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
-
-    private final WriteTransactionsInput input;
-
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private final List<ListenableFuture<Void>> futures = new ArrayList<>();
-    private final Set<Integer> usedValues = new HashSet<>();
-
-    private RandomnessProvider random;
-    private TxProvider txProvider;
+public abstract class WriteTransactionsHandler extends AbstractTransactionHandler {
+    private static final class Chained extends WriteTransactionsHandler implements TransactionChainListener {
+        private final SplittableRandom random = new SplittableRandom();
+        private final DOMTransactionChain transactionChain;
 
-    private final DOMDataBroker domDataBroker;
-    private final Long runtimeNanos;
-    private final Long delayNanos;
-    private final String id;
+        Chained(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem,
+            final WriteTransactionsInput input) {
+            super(idListItem, input);
+            transactionChain = dataBroker.createTransactionChain(this);
+        }
 
-    private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
-    private Stopwatch stopwatch;
+        @Override
+        DOMDataWriteTransaction createTransaction() {
+            return transactionChain.newWriteOnlyTransaction();
+        }
 
-    private long allTx = 0;
-    private long insertTx = 0;
-    private long deleteTx = 0;
-    private ScheduledFuture<?> scheduledFuture;
-    private YangInstanceIdentifier idListItem;
+        @Override
+        int nextInt(final int bound) {
+            return random.nextInt(bound);
+        }
 
-    public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
-        this.domDataBroker = domDataBroker;
-        this.input = input;
+        @Override
+        public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+                final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+            LOG.warn("Transaction chain failed.", cause);
+            completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
+        }
 
-        runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
-        delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
-        id = input.getId();
+        @Override
+        public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+            LOG.debug("Transaction chain closed successfully.");
+        }
     }
 
-    @Override
-    public void run() {
-        futures.add(execWrite(futures.size()));
-        maybeFinish();
-    }
+    private static final class Simple extends WriteTransactionsHandler {
+        private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
+        private final SplittableRandom random = new SplittableRandom();
+        private final DOMDataBroker dataBroker;
 
-    public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
-        LOG.debug("Starting write-transactions.");
+        Simple(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem,
+            final WriteTransactionsInput input) {
+            super(idListItem, input);
+            this.dataBroker = Preconditions.checkNotNull(dataBroker);
+        }
 
-        if (input.isChainedTransactions()) {
-            txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
-            random = new BasicProvider();
-        } else {
-            txProvider = new DataBrokerBackedProvider(domDataBroker);
-            random = new NonConflictingProvider();
+        @Override
+        DOMDataWriteTransaction createTransaction() {
+            return dataBroker.newWriteOnlyTransaction();
         }
 
-        if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
-            stopwatch = Stopwatch.createStarted();
-            completionFuture = settableFuture;
-            scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
-        } else {
-            executor.shutdown();
+        @Override
+        int nextInt(final int bound) {
+            int nextInt;
+            do {
+                nextInt = random.nextInt(bound);
+            } while (previousNumbers.contains(nextInt));
+
+            if (previousNumbers.size() > 100000) {
+                previousNumbers.iterator().remove();
+            }
+            previousNumbers.add(nextInt);
+
+            return nextInt;
         }
     }
 
-    private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
+    private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
+
+    final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture = SettableFuture.create();
+    private final Set<Integer> usedValues = new HashSet<>();
+    private final YangInstanceIdentifier idListItem;
+
+    private long insertTx = 0;
+    private long deleteTx = 0;
+
+    WriteTransactionsHandler(final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) {
+        super(input);
+        this.idListItem = Preconditions.checkNotNull(idListItem);
+    }
+
+    public static ListenableFuture<RpcResult<WriteTransactionsOutput>> start(final DOMDataBroker domDataBroker,
+            final WriteTransactionsInput input) {
+        LOG.debug("Starting write-transactions.");
+
+        final String id = input.getId();
+        final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
+                .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
+                .build();
+        final YangInstanceIdentifier idListItem = ID_INT_YID.node(entry.getIdentifier());
 
         final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
                 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
                 .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
                 .build();
 
-        DOMDataWriteTransaction tx = txProvider.createTransaction();
+        DOMDataWriteTransaction tx = domDataBroker.newWriteOnlyTransaction();
         // write only the top list
         tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final OptimisticLockFailedException e) {
             // when multiple write-transactions are executed concurrently we need to ignore this.
             // If we get optimistic lock here it means id-ints already exists and we can continue.
             LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
         } catch (final TransactionCommitFailedException | TimeoutException e) {
             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
-            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            return false;
         }
 
-        final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
-                .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
-                .build();
-
-        idListItem = ID_INT_YID.node(entry.getIdentifier());
-        tx = txProvider.createTransaction();
+        tx = domDataBroker.newWriteOnlyTransaction();
         tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry);
 
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
-            return true;
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final Exception e) {
             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
-            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            return false;
         }
-    }
 
-    private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
         LOG.debug("Filling the item list with initial values.");
 
         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
 
         final YangInstanceIdentifier itemListId = idListItem.node(ITEM);
-        final DOMDataWriteTransaction tx = txProvider.createTransaction();
+        tx = domDataBroker.newWriteOnlyTransaction();
         tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
 
         try {
-            tx.submit().checkedGet(125, TimeUnit.SECONDS);
-            return true;
+            tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (final Exception e) {
             LOG.warn("Unable to fill the initial item list.", e);
-            settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
-            return false;
         }
+
+        final WriteTransactionsHandler handler;
+        if (input.isChainedTransactions()) {
+            handler = new Chained(domDataBroker, idListItem, input);
+        } else {
+            handler = new Simple(domDataBroker, idListItem, input);
+        }
+
+        handler.doStart();
+        return handler.completionFuture;
     }
 
-    private ListenableFuture<Void> execWrite(final int offset) {
-        final int i = random.nextInt(MAX_ITEM + 1);
+    @Override
+    ListenableFuture<Void> execWrite(final long txId) {
+        final int i = nextInt(MAX_ITEM + 1);
 
         final YangInstanceIdentifier entryId =
                 idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
 
-        final DOMDataWriteTransaction tx = txProvider.createTransaction();
-        allTx++;
+        final DOMDataWriteTransaction tx = createTransaction();
 
         if (usedValues.contains(i)) {
             LOG.debug("Deleting item: {}", i);
@@ -217,181 +216,35 @@ public class WriteTransactionsHandler implements Runnable {
             usedValues.add(i);
         }
 
-        final ListenableFuture<Void> future = tx.submit();
-        if (LOG.isDebugEnabled()) {
-            Futures.addCallback(future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void result) {
-                    LOG.debug("Future #{} completed successfully", offset);
-                }
-
-                @Override
-                public void onFailure(final Throwable cause) {
-                    LOG.debug("Future #{} failed", offset, cause);
-                }
-            });
-        }
-
-        return future;
-    }
-
-    private void maybeFinish() {
-        final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
-        if (elapsed >= runtimeNanos) {
-            LOG.debug("Reached max running time, waiting for futures to complete.");
-            scheduledFuture.cancel(false);
-
-            final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
-            try {
-                // Timeout from cds should be 2 minutes so leave some leeway.
-                allFutures.get(125, TimeUnit.SECONDS);
-
-                LOG.debug("All futures completed successfully.");
-
-                final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
-                        .setAllTx(allTx)
-                        .setInsertTx(insertTx)
-                        .setDeleteTx(deleteTx)
-                        .build();
-
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
-                        .withResult(output).build());
-
-                executor.shutdown();
-            } catch (final ExecutionException e) {
-                LOG.error("Write transactions failed.", e.getCause());
-
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
-            } catch (InterruptedException | TimeoutException e) {
-                LOG.error("Write transactions failed.", e);
-
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION,
-                                "Final submit was timed out by the test provider or was interrupted", e).build());
-
-                for (int i = 0; i < futures.size(); i++) {
-                    final ListenableFuture<Void> future = futures.get(i);
-
-                    try {
-                        future.get(0, TimeUnit.NANOSECONDS);
-                    } catch (final TimeoutException fe) {
-                        LOG.warn("Future #{}/{} not completed yet", i, futures.size());
-                    } catch (final ExecutionException fe) {
-                        LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
-                    } catch (final InterruptedException fe) {
-                        LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
-                    }
-                }
-            } catch (Exception exception) {
-                LOG.error("Write transactions failed.", exception);
-                completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
-
-                executor.shutdown();
-            }
-        }
-    }
-
-    private interface RandomnessProvider {
-        int nextInt(int bound);
-    }
-
-    private static class NonConflictingProvider implements RandomnessProvider {
-
-        private final SplittableRandom random = new SplittableRandom();
-        private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
-
-        @Override
-        public int nextInt(int bound) {
-            int nextInt;
-            do {
-                nextInt = random.nextInt(bound);
-            } while (previousNumbers.contains(nextInt));
-
-            if (previousNumbers.size() > 100000) {
-                previousNumbers.iterator().remove();
-            }
-            previousNumbers.add(nextInt);
-
-            return nextInt;
-        }
+        return tx.submit();
     }
 
-    private static class BasicProvider implements RandomnessProvider {
-
-        private final SplittableRandom random = new SplittableRandom();
-
-        @Override
-        public int nextInt(int bound) {
-            return random.nextInt(bound);
-        }
-    }
-
-    private interface TxProvider {
-
-        DOMDataWriteTransaction createTransaction();
+    @Override
+    void runFailed(final Throwable cause) {
+        completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
     }
 
-    private static class TxChainBackedProvider implements TxProvider {
-
-        private final DOMTransactionChain transactionChain;
-
-        TxChainBackedProvider(final DOMDataBroker dataBroker,
-                              final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
-                              final ScheduledExecutorService executor) {
-
-            transactionChain =
-                    dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
-        }
+    @Override
+    void runSuccessful(final long allTx) {
+        final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
+                .setAllTx(allTx)
+                .setInsertTx(insertTx)
+                .setDeleteTx(deleteTx)
+                .build();
 
-        @Override
-        public DOMDataWriteTransaction createTransaction() {
-            return transactionChain.newWriteOnlyTransaction();
-        }
+        completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
+                .withResult(output).build());
     }
 
-    private static class DataBrokerBackedProvider implements TxProvider {
-
-        private final DOMDataBroker dataBroker;
-
-        DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
-            this.dataBroker = dataBroker;
-        }
-
-        @Override
-        public DOMDataWriteTransaction createTransaction() {
-            return dataBroker.newWriteOnlyTransaction();
-        }
+    @Override
+    void runTimedOut(final Exception cause) {
+        completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+            .withError(RpcError.ErrorType.APPLICATION,
+                    "Final submit was timed out by the test provider or was interrupted", cause).build());
     }
 
-    private static class TestChainListener implements TransactionChainListener {
-
-        private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
-        private final ScheduledExecutorService executor;
-
-        TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
-                          final ScheduledExecutorService executor) {
-
-            this.resultFuture = resultFuture;
-            this.executor = executor;
-        }
-
-        @Override
-        public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
-                                             final AsyncTransaction<?, ?> transaction,
-                                             final Throwable cause) {
-            LOG.warn("Transaction chain failed.", cause);
-            resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
-
-            executor.shutdown();
-        }
+    abstract DOMDataWriteTransaction createTransaction();
 
-        @Override
-        public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-            LOG.debug("Transaction chain closed successfully.");
-        }
-    }
+    abstract int nextInt(int bound);
 }
index 38e0596..c6744a9 100644 (file)
@@ -23,10 +23,10 @@ public class YnlListener implements OdlMdsalLowlevelTargetListener {
 
     private final String id;
 
-    private AtomicLong localNumber = new AtomicLong();
-    private AtomicLong allNot = new AtomicLong();
-    private AtomicLong idNot = new AtomicLong();
-    private AtomicLong errNot = new AtomicLong();
+    private final AtomicLong localNumber = new AtomicLong();
+    private final AtomicLong allNot = new AtomicLong();
+    private final AtomicLong idNot = new AtomicLong();
+    private final AtomicLong errNot = new AtomicLong();
 
     public YnlListener(final String id) {
         Preconditions.checkNotNull(id);
@@ -42,14 +42,13 @@ public class YnlListener implements OdlMdsalLowlevelTargetListener {
         if (notification.getId().equals(id)) {
             idNot.incrementAndGet();
 
-            localNumber.getAndUpdate((value -> {
+            localNumber.getAndUpdate(value -> {
                 if (notification.getSequenceNumber() - value == 1) {
                     return value + 1;
-                } else {
-                    errNot.getAndIncrement();
-                    return value;
                 }
-            }));
+                errNot.getAndIncrement();
+                return value;
+            });
         }
     }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.