From 7d53e7984cfbd69fdf78ad07112974aa3e56fc55 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 23 Jun 2017 13:02:40 +0200 Subject: [PATCH] BUG-8494: Cleanup clustering-it-provider Fixes various warnings and refactors MdsalLowLevelTestProvider to be slightly cleaner in terms of number of classes. It also eliminates synchronous thread blocking on future collection and instead schedules task which performs the cleanup if the system gets stuck. Change-Id: I657f3df60c620284538bdf39ab1536eac8448801 Signed-off-by: Robert Varga (cherry picked from commit d97061af6814ad7b085af10797a252aa4aa5cda6) --- .../main/yang/odl-mdsal-lowlevel-control.yang | 85 ++-- .../it/provider/CarDataChangeListener.java | 4 +- .../provider/CarDataTreeChangeListener.java | 10 +- .../CarEntryDataTreeCommitCohort.java | 7 +- .../clustering/it/provider/CarProvider.java | 22 +- .../provider/MdsalLowLevelTestProvider.java | 31 +- .../it/provider/PeopleProvider.java | 6 +- .../impl/AbstractTransactionHandler.java | 224 ++++++++++ .../it/provider/impl/PrefixShardHandler.java | 17 +- .../impl/ProduceTransactionsHandler.java | 215 +++------- .../impl/RoutedGetConstantService.java | 1 - .../impl/WriteTransactionsHandler.java | 389 ++++++------------ .../it/provider/impl/YnlListener.java | 17 +- 13 files changed, 481 insertions(+), 547 deletions(-) create mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java diff --git a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang index 89d220bcf5..aa2f2da7dd 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang +++ b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang @@ -227,6 +227,37 @@ module odl-mdsal-lowlevel-control { } } + grouping transactions-params { + leaf seconds { + description "This RPC has to work (roughly) this long."; + mandatory true; + type uint32; + } + leaf transactions-per-second { + description "An upper limit of transactions per second this RPC shall try to achieve."; + mandatory true; + type uint32; + } + } + + grouping transactions-result { + leaf all-tx { + description "Number of all transactions executed."; + type int64; + mandatory true; + } + leaf insert-tx { + description "Number of transactions that inserted data."; + type int64; + mandatory true; + } + leaf delete-tx { + description "Number of transactions that deleted data."; + type int64; + mandatory true; + } + } + rpc write-transactions { description "Upon receiving this, the member shall make sure the outer list item of llt:id-ints exists for the given id, and then start creating (one by one) @@ -245,16 +276,7 @@ module odl-mdsal-lowlevel-control { OptimisticLockException is always considered an error."; input { uses llc:id-grouping; - leaf seconds { - description "This RPC has to work (roughly) this long."; - mandatory true; - type uint32; - } - leaf transactions-per-second { - description "An upper limit of transactions per second this RPC shall try to achieve."; - mandatory true; - type uint32; - } + uses transactions-params; leaf chained-transactions { description "If true, write transactions shall be created on a transaction chain, (created at start of the RPC call, and deleted at at its end). @@ -264,21 +286,7 @@ module odl-mdsal-lowlevel-control { } } output { - leaf all-tx { - description "Number of all transactions executed."; - type int64; - mandatory true; - } - leaf insert-tx { - description "Number of transactions that inserted data."; - type int64; - mandatory true; - } - leaf delete-tx { - description "Number of transactions that deleted data."; - type int64; - mandatory true; - } + uses transactions-result; } } @@ -305,16 +313,7 @@ module odl-mdsal-lowlevel-control { but the shard and the whole id item shall be kept as they are."; input { uses llc:id-grouping; - leaf seconds { - description "This RPC has to work (roughly) this long."; - mandatory true; - type uint32; - } - leaf transactions-per-second { - description "An upper limit of transactions per second this RPC shall try to achieve."; - mandatory true; - type uint32; - } + uses transactions-params; leaf isolated-transactions { description "The value for DOMDataTreeProducer#createTransaction argument."; mandatory true; @@ -322,21 +321,7 @@ module odl-mdsal-lowlevel-control { } } output { - leaf all-tx { - description "Number of all transactions executed."; - type int64; - mandatory true; - } - leaf insert-tx { - description "Number of transactions that inserted data."; - type int64; - mandatory true; - } - leaf delete-tx { - description "Number of transactions that deleted data."; - type int64; - mandatory true; - } + uses transactions-result; } } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataChangeListener.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataChangeListener.java index ac5c368927..22def4c3b7 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataChangeListener.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataChangeListener.java @@ -27,14 +27,14 @@ public class CarDataChangeListener implements DataChangeListener { private static final Logger LOG = LoggerFactory.getLogger(CarDataChangeListener.class); @Override - public void onDataChanged(AsyncDataChangeEvent, DataObject> change) { + public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { if (LOG.isTraceEnabled()) { LOG.trace("onDataChanged invoked"); outputChanges(change); } } - private void outputChanges(final AsyncDataChangeEvent, DataObject> change) { + private static void outputChanges(final AsyncDataChangeEvent, DataObject> change) { final Map, DataObject> originalData = change.getOriginalData() != null ? change.getOriginalData(): Collections., DataObject>emptyMap(); final Map, DataObject> updatedData = change.getUpdatedData() != null ? diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataTreeChangeListener.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataTreeChangeListener.java index c157b6cd89..a10e22f871 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataTreeChangeListener.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarDataTreeChangeListener.java @@ -8,13 +8,9 @@ package org.opendaylight.controller.clustering.it.provider; -import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType; -import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE; -import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED; -import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE; - import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; +import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType; import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; @@ -32,7 +28,7 @@ public class CarDataTreeChangeListener implements DataTreeChangeListener { private static final Logger LOG = LoggerFactory.getLogger(CarDataTreeChangeListener.class); @java.lang.Override - public void onDataTreeChanged(@Nonnull java.util.Collection> changes) { + public void onDataTreeChanged(@Nonnull final java.util.Collection> changes) { if (LOG.isTraceEnabled()) { for (DataTreeModification change : changes) { ouputChanges(change); @@ -40,7 +36,7 @@ public class CarDataTreeChangeListener implements DataTreeChangeListener { } } - private void ouputChanges(final DataTreeModification change) { + private static void ouputChanges(final DataTreeModification change) { final DataObjectModification rootNode = change.getRootNode(); final ModificationType modificationType = rootNode.getModificationType(); final InstanceIdentifier rootIdentifier = change.getRootPath().getRootIdentifier(); diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarEntryDataTreeCommitCohort.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarEntryDataTreeCommitCohort.java index 1bc6581b91..2700159562 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarEntryDataTreeCommitCohort.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarEntryDataTreeCommitCohort.java @@ -39,10 +39,9 @@ public class CarEntryDataTreeCommitCohort implements DOMDataTreeCommitCohort { private static final QName YEAR_QNAME = QName.create(Cars.QNAME, "year").intern(); private static final NodeIdentifier YEAR_NODE_ID = new NodeIdentifier(YEAR_QNAME); - @SuppressWarnings("unchecked") @Override - public CheckedFuture canCommit(Object txId, - DOMDataTreeCandidate candidate, SchemaContext ctx) { + public CheckedFuture canCommit(final Object txId, + final DOMDataTreeCandidate candidate, final SchemaContext ctx) { // Simple data validation - verify the year, if present, is >= 1990 @@ -77,6 +76,6 @@ public class CarEntryDataTreeCommitCohort implements DOMDataTreeCommitCohort { // Return the noop PostCanCommitStep as we're only validating input data and not participating in the // remaining 3PC stages (pre-commit and commit). - return (CheckedFuture) PostCanCommitStep.NOOP_SUCCESS_FUTURE; + return PostCanCommitStep.NOOP_SUCCESS_FUTURE; } } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java index ec4c1bb161..ea9f7b1d76 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java @@ -20,7 +20,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; @@ -63,7 +62,7 @@ public class CarProvider implements CarService { private static final Logger LOG_CAR_PROVIDER = LoggerFactory.getLogger(CarProvider.class); private static final String ENTITY_TYPE = "cars"; - private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build(); + private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build(); private static final DataTreeIdentifier CARS_DTID = new DataTreeIdentifier<>( LogicalDatastoreType.CONFIGURATION, CARS_IID); @@ -76,7 +75,7 @@ public class CarProvider implements CarService { private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener(); private final AtomicBoolean registeredListener = new AtomicBoolean(); - private final Collection> carsDclRegistrations = + private final Collection> carsDclRegistrations = Sets.newConcurrentHashSet(); private final Collection> carsDtclRegistrations = Sets.newConcurrentHashSet(); @@ -86,8 +85,8 @@ public class CarProvider implements CarService { private final AtomicReference> commitCohortReg = new AtomicReference<>(); - public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService, - DOMDataBroker domDataBroker) { + public CarProvider(final DataBroker dataProvider, final EntityOwnershipService ownershipService, + final DOMDataBroker domDataBroker) { this.dataProvider = dataProvider; this.ownershipService = ownershipService; this.domDataBroker = domDataBroker; @@ -110,7 +109,7 @@ public class CarProvider implements CarService { } @Override - public Future> stressTest(StressTestInput input) { + public Future> stressTest(final StressTestInput input) { final int inputRate; final long inputCount; @@ -120,10 +119,9 @@ public class CarProvider implements CarService { return Futures.immediateFuture(RpcResultBuilder.failed() .withError(ErrorType.PROTOCOL, "invalid rate") .build()); - } else { - inputRate = input.getRate(); } + inputRate = input.getRate(); if (input.getCount() != null) { inputCount = input.getCount(); } else { @@ -218,7 +216,7 @@ public class CarProvider implements CarService { @Override - public Future> registerOwnership(RegisterOwnershipInput input) { + public Future> registerOwnership(final RegisterOwnershipInput input) { if(registeredListener.compareAndSet(false, true)) { ownershipService.registerListener(ENTITY_TYPE, ownershipListener); } @@ -235,13 +233,13 @@ public class CarProvider implements CarService { } @Override - public Future> unregisterOwnership(UnregisterOwnershipInput input) { + public Future> unregisterOwnership(final UnregisterOwnershipInput input) { return RpcResultBuilder.success().buildFuture(); } private static class CarEntityOwnershipListener implements EntityOwnershipListener { @Override - public void ownershipChanged(EntityOwnershipChange ownershipChange) { + public void ownershipChanged(final EntityOwnershipChange ownershipChange) { LOG_CAR_PROVIDER.info("ownershipChanged: {}", ownershipChange); } } @@ -278,7 +276,7 @@ public class CarProvider implements CarService { LOG_CAR_PROVIDER.info("Unregistering the CarDataChangeListener(s)"); synchronized (carsDclRegistrations) { int numListeners = 0; - for (ListenerRegistration carsDclRegistration : carsDclRegistrations) { + for (ListenerRegistration carsDclRegistration : carsDclRegistrations) { carsDclRegistration.close(); numListeners++; } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index a93d99fa45..e0e8d99d1a 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -137,17 +137,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final DOMDataTreeChangeService domDataTreeChangeService; private final ActorSystem actorSystem; - private Map, DOMRpcImplementationRegistration> routedRegistrations = + private final Map, DOMRpcImplementationRegistration> routedRegistrations = new HashMap<>(); - private Map> ynlRegistrations = new HashMap<>(); + private final Map> ynlRegistrations = new HashMap<>(); private DOMRpcImplementationRegistration globalGetConstantRegistration = null; private ClusterSingletonServiceRegistration getSingletonConstantRegistration; private FlappingSingletonService flappingSingletonService; private ListenerRegistration dtclReg; private IdIntsListener idIntsListener; - private Map publishNotificationsTasks = new HashMap<>(); + private final Map publishNotificationsTasks = new HashMap<>(); private ListenerRegistration ddtlReg; private IdIntsDOMDataTreeLIstener idIntsDdtl; @@ -252,13 +252,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> writeTransactions(final WriteTransactionsInput input) { LOG.debug("write-transactions, input: {}", input); - - final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input); - - final SettableFuture> settableFuture = SettableFuture.create(); - writeTransactionsHandler.start(settableFuture); - - return settableFuture; + return WriteTransactionsHandler.start(domDataBroker, input); } @Override @@ -267,7 +261,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> removeShardReplica(RemoveShardReplicaInput input) { + public Future> removeShardReplica(final RemoveShardReplicaInput input) { return null; } @@ -339,7 +333,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> registerDefaultConstant(RegisterDefaultConstantInput input) { + public Future> registerDefaultConstant(final RegisterDefaultConstantInput input) { return null; } @@ -381,7 +375,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> addShardReplica(AddShardReplicaInput input) { + public Future> addShardReplica(final AddShardReplicaInput input) { return null; } @@ -573,14 +567,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService @Override public Future> produceTransactions(final ProduceTransactionsInput input) { LOG.debug("producer-transactions, input: {}", input); - - final ProduceTransactionsHandler handler = - new ProduceTransactionsHandler(domDataTreeService, input); - - final SettableFuture> settableFuture = SettableFuture.create(); - handler.start(settableFuture); - - return settableFuture; + return ProduceTransactionsHandler.start(domDataTreeService, input); } @Override @@ -601,7 +588,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService public Future> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) { LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input); - final InstanceIdentifier shardPrefix = input.getPrefix(); + final InstanceIdentifier shardPrefix = input.getPrefix(); if (shardPrefix == null) { final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element", diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/PeopleProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/PeopleProvider.java index 48385b4e2f..09822831b2 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/PeopleProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/PeopleProvider.java @@ -43,12 +43,12 @@ public class PeopleProvider implements PeopleService, AutoCloseable { } - public void setRpcRegistration(BindingAwareBroker.RoutedRpcRegistration rpcRegistration) { + public void setRpcRegistration(final BindingAwareBroker.RoutedRpcRegistration rpcRegistration) { this.rpcRegistration = rpcRegistration; } @Override - public Future> addPerson(AddPersonInput input) { + public Future> addPerson(final AddPersonInput input) { LOG.info("RPC addPerson : adding person [{}]", input); PersonBuilder builder = new PersonBuilder(input); @@ -59,7 +59,7 @@ public class PeopleProvider implements PeopleService, AutoCloseable { final InstanceIdentifier.InstanceIdentifierBuilder personIdBuilder = InstanceIdentifier.builder(People.class) .child(Person.class, person.getKey()); - final InstanceIdentifier personId = personIdBuilder.build(); + final InstanceIdentifier personId = personIdBuilder.build(); // Place entry in data store tree WriteTransaction tx = dataProvider.newWriteOnlyTransaction(); tx.put(LogicalDatastoreType.CONFIGURATION, personId, person, true); diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java new file mode 100644 index 0000000000..f23e7ec194 --- /dev/null +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.clustering.it.provider.impl; + +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractTransactionHandler { + private abstract static class Phase { + abstract void txSuccess(ListenableFuture execFuture, long txId); + + abstract void txFailure(ListenableFuture execFuture, long txId, Throwable cause); + } + + private static final class Running extends Phase { + private final Queue> futures = new ArrayDeque<>(); + private Throwable failure; + + void addFuture(final ListenableFuture execFuture) { + futures.add(execFuture); + } + + @Override + void txSuccess(final ListenableFuture execFuture, final long txId) { + futures.remove(execFuture); + } + + @Override + void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { + futures.remove(execFuture); + if (failure != null) { + failure = cause; + } + } + + Optional getFailure() { + return Optional.ofNullable(failure); + } + } + + private final class Collecting extends Phase { + private final List> futures; + private boolean done; + + Collecting(final Collection> futures) { + this.futures = new ArrayList<>(futures); + } + + @Override + void txSuccess(final ListenableFuture execFuture, final long txId) { + futures.remove(execFuture); + if (futures.isEmpty() && !done) { + LOG.debug("All futures completed successfully."); + runSuccessful(txCounter); + } + } + + @Override + void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { + futures.remove(execFuture); + done = true; + runFailed(cause); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class); + + static final int SECOND_AS_NANO = 1000000000; + //2^20 as in the model + static final int MAX_ITEM = 1048576; + + static final QName ID_INTS = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); + static final QName ID = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); + static final QName ITEM = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); + static final QName NUMBER = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern(); + + public static final QName ID_INT = + QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); + public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); + public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); + + static final long INIT_TX_TIMEOUT_SECONDS = 125; + + private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final Stopwatch stopwatch = Stopwatch.createStarted(); + private final long runtimeNanos; + private final long delayNanos; + + private ScheduledFuture scheduledFuture; + private long txCounter; + @GuardedBy("this") + private Phase phase; + + AbstractTransactionHandler(final TransactionsParams params) { + runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds()); + delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond(); + } + + final synchronized void doStart() { + phase = new Running(); + scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); + } + + private void execute() { + final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + if (elapsed < runtimeNanos) { + // Not completed yet: create a transaction and hook it up + final long txId = txCounter++; + final ListenableFuture execFuture = execWrite(txId); + + // Ordering is important: we need to add the future before hooking the callback + synchronized (this) { + ((Running) phase).addFuture(execFuture); + } + Futures.addCallback(execFuture, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + txSuccess(execFuture, txId); + } + + @Override + public void onFailure(final Throwable cause) { + txFailure(execFuture, txId, cause); + } + }); + } else { + startCollection(); + } + } + + private synchronized void startCollection() { + scheduledFuture.cancel(false); + + final Running running = (Running) phase; + final Optional failure = running.getFailure(); + if (failure.isPresent()) { + executor.shutdown(); + runFailed(failure.get()); + return; + } + + LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size()); + if (running.futures.isEmpty()) { + executor.shutdown(); + runSuccessful(txCounter); + return; + } + + phase = new Collecting(running.futures); + executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + executor.shutdown(); + } + + final synchronized void txSuccess(final ListenableFuture execFuture, final long txId) { + LOG.debug("Future #{} completed successfully", txId); + phase.txSuccess(execFuture, txId); + } + + final synchronized void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { + LOG.debug("Future #{} failed", txId, cause); + phase.txFailure(execFuture, txId, cause); + } + + private synchronized void checkCollection() { + final Collecting collecting = (Collecting) phase; + if (!collecting.done) { + final int size = collecting.futures.size(); + for (int i = 0; i < size; i++) { + final ListenableFuture future = collecting.futures.get(i); + + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + LOG.warn("Future #{}/{} not completed yet", i, size); + } catch (final ExecutionException e) { + LOG.warn("Future #{}/{} failed", i, size, e.getCause()); + } catch (final InterruptedException e) { + LOG.warn("Interrupted while examining future #{}/{}", i, size, e); + } + } + + runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds")); + } + } + + abstract ListenableFuture execWrite(final long txId); + + abstract void runFailed(Throwable cause); + + abstract void runSuccessful(long allTx); + + abstract void runTimedOut(Exception cause); +} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java index 8f711b3337..4ffb415d82 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java @@ -8,10 +8,10 @@ package org.opendaylight.controller.clustering.it.provider.impl; -import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID; -import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INT; -import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INTS; -import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ITEM; +import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID; +import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INT; +import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INTS; +import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ITEM; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; @@ -38,7 +38,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -94,13 +93,13 @@ public class PrefixShardHandler { final CheckedFuture ensureFuture = ensureListExists(); Futures.addCallback(ensureFuture, new FutureCallback() { @Override - public void onSuccess(@Nullable Void result) { + public void onSuccess(@Nullable final Void result) { LOG.debug("Initial list write successful."); future.set(RpcResultBuilder.success().build()); } @Override - public void onFailure(Throwable throwable) { + public void onFailure(final Throwable throwable) { LOG.warn("Shard[{}] creation failed:", identifier, throwable); final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed", @@ -189,7 +188,7 @@ public class PrefixShardHandler { final CheckedFuture future = tx.submit(); Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(@Nullable Void result) { + public void onSuccess(@Nullable final Void result) { try { LOG.debug("Closing producer for initial list."); producer.close(); @@ -199,7 +198,7 @@ public class PrefixShardHandler { } @Override - public void onFailure(Throwable throwable) { + public void onFailure(final Throwable throwable) { //NOOP handled by the caller of this method. } }); diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java index 2a1f5ae47d..b1348c2fc4 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java @@ -8,23 +8,15 @@ package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.FutureCallback; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.SplittableRandom; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; @@ -35,7 +27,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -47,81 +38,34 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ProduceTransactionsHandler implements Runnable { - +public class ProduceTransactionsHandler extends AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class); - private static final int SECOND_AS_NANO = 1000000000; - //2^20 as in the model - private static final int MAX_ITEM = 1048576; - - static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); - public static final QName ID_INT = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); - static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); - static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); - private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern()); - - public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final List> futures = new ArrayList<>(); - private final Set usedValues = new HashSet<>(); + private final SettableFuture> future = SettableFuture.create(); private final SplittableRandom random = new SplittableRandom(); + private final Set usedValues = new HashSet<>(); + private final DOMDataTreeIdentifier idListItem; + private final DOMDataTreeProducer itemProducer; - private final DOMDataTreeService domDataTreeService; - private final long runtimeNanos; - private final long delayNanos; - private final String id; - - private SettableFuture> completionFuture; - private Stopwatch stopwatch; - - private long allTx = 0; private long insertTx = 0; private long deleteTx = 0; - private ScheduledFuture scheduledFuture; - private DOMDataTreeProducer itemProducer; - private DOMDataTreeIdentifier idListItem; - - public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService, - final ProduceTransactionsInput input) { - - this.domDataTreeService = domDataTreeService; - runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds()); - delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond(); - id = input.getId(); - } - - @Override - public void run() { - futures.add(execWrite(futures.size())); - maybeFinish(); + private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem, + final ProduceTransactionsInput input) { + super(input); + this.itemProducer = Preconditions.checkNotNull(producer); + this.idListItem = Preconditions.checkNotNull(idListItem); } - public void start(final SettableFuture> settableFuture) { - completionFuture = settableFuture; - - if (fillInitialList(completionFuture)) { - stopwatch = Stopwatch.createStarted(); - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS); - } else { - executor.shutdown(); - } - } - - private boolean fillInitialList(final SettableFuture> settableFuture) { - LOG.debug("Filling the item list with initial values."); + public static ListenableFuture> start( + final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) { + final String id = input.getId(); + LOG.debug("Filling the item list {} with initial values.", id); final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id)); - itemProducer = domDataTreeService.createProducer( - Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); + final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer( + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); final DOMDataTreeWriteCursor cursor = @@ -131,33 +75,34 @@ public class ProduceTransactionsHandler implements Runnable { cursor.write(list.getIdentifier(), list); cursor.close(); - idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, - idListWithKey.node(list.getIdentifier()).toOptimized()); - try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); - return true; + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - } - try { - itemProducer.close(); - } catch (final DOMDataTreeProducerException exception) { - LOG.warn("Failure while closing producer.", exception); + try { + itemProducer.close(); + } catch (final DOMDataTreeProducerException exception) { + LOG.warn("Failure while closing producer.", exception); + } + + return Futures.immediateFuture(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); } - return false; + + final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer, + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier()) + .toOptimized()), input); + handler.doStart(); + return handler.future; } - private ListenableFuture execWrite(final int offset) { + @Override + ListenableFuture execWrite(final long txId) { final int i = random.nextInt(MAX_ITEM + 1); final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem); - allTx++; - final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); @@ -177,80 +122,30 @@ public class ProduceTransactionsHandler implements Runnable { cursor.close(); - final ListenableFuture future = tx.submit(); - if (LOG.isDebugEnabled()) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - LOG.debug("Future #{} completed successfully", offset); - } - - @Override - public void onFailure(final Throwable cause) { - LOG.debug("Future #{} failed", offset, cause); - } - }); - } - - return future; + return tx.submit(); } - private void maybeFinish() { - final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - if (elapsed >= runtimeNanos) { - LOG.debug("Reached max running time, waiting for futures to complete."); - scheduledFuture.cancel(false); - - final ListenableFuture> allFutures = Futures.allAsList(futures); - - try { - // Timeout from cds should be 2 minutes so leave some leeway. - allFutures.get(125, TimeUnit.SECONDS); - - LOG.debug("All futures completed successfully."); - - final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); - } catch (ExecutionException e) { - LOG.error("Write transactions failed.", e.getCause()); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build()); - } catch (InterruptedException | TimeoutException e) { - LOG.error("Write transactions failed.", e); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, - "Final submit was timed out by the test provider or was interrupted", e).build()); - - for (int i = 0; i < futures.size(); i++) { - final ListenableFuture future = futures.get(i); + @Override + void runFailed(final Throwable cause) { + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build()); + } - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (TimeoutException fe) { - LOG.warn("Future #{}/{} not completed yet", i, futures.size()); - } catch (ExecutionException fe) { - LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause()); - } catch (InterruptedException fe) { - LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e); - } - } - } catch (Exception e) { - LOG.error("Write transactions failed.", e); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - } + @Override + void runSuccessful(final long allTx) { + final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); + future.set(RpcResultBuilder.success() + .withResult(output).build()); + } - executor.shutdown(); - try { - itemProducer.close(); - } catch (final DOMDataTreeProducerException e) { - LOG.warn("Failure while closing item producer.", e); - } - } + @Override + void runTimedOut(final Exception cause) { + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", cause).build()); } } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/RoutedGetConstantService.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/RoutedGetConstantService.java index b4bc304a62..92b741d929 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/RoutedGetConstantService.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/RoutedGetConstantService.java @@ -12,7 +12,6 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation; diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java index a026e6f2f3..797e252a91 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -8,22 +8,14 @@ package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashSet; -import java.util.List; import java.util.Set; import java.util.SplittableRandom; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; @@ -38,7 +30,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -53,155 +44,163 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WriteTransactionsHandler implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); - private static final int SECOND_AS_NANO = 1000000000; - //2^20 as in the model - private static final int MAX_ITEM = 1048576; - - private static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern(); - private static final QName ID_INT = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern(); - private static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern(); - private static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern(); - private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern(); - - public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS); - public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized(); - - private final WriteTransactionsInput input; - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final List> futures = new ArrayList<>(); - private final Set usedValues = new HashSet<>(); - - private RandomnessProvider random; - private TxProvider txProvider; +public abstract class WriteTransactionsHandler extends AbstractTransactionHandler { + private static final class Chained extends WriteTransactionsHandler implements TransactionChainListener { + private final SplittableRandom random = new SplittableRandom(); + private final DOMTransactionChain transactionChain; - private final DOMDataBroker domDataBroker; - private final Long runtimeNanos; - private final Long delayNanos; - private final String id; + Chained(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem, + final WriteTransactionsInput input) { + super(idListItem, input); + transactionChain = dataBroker.createTransactionChain(this); + } - private SettableFuture> completionFuture; - private Stopwatch stopwatch; + @Override + DOMDataWriteTransaction createTransaction() { + return transactionChain.newWriteOnlyTransaction(); + } - private long allTx = 0; - private long insertTx = 0; - private long deleteTx = 0; - private ScheduledFuture scheduledFuture; - private YangInstanceIdentifier idListItem; + @Override + int nextInt(final int bound) { + return random.nextInt(bound); + } - public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) { - this.domDataBroker = domDataBroker; - this.input = input; + @Override + public void onTransactionChainFailed(final TransactionChain chain, + final AsyncTransaction transaction, final Throwable cause) { + LOG.warn("Transaction chain failed.", cause); + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); + } - runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds()); - delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond(); - id = input.getId(); + @Override + public void onTransactionChainSuccessful(final TransactionChain chain) { + LOG.debug("Transaction chain closed successfully."); + } } - @Override - public void run() { - futures.add(execWrite(futures.size())); - maybeFinish(); - } + private static final class Simple extends WriteTransactionsHandler { + private final LinkedHashSet previousNumbers = new LinkedHashSet<>(); + private final SplittableRandom random = new SplittableRandom(); + private final DOMDataBroker dataBroker; - public void start(final SettableFuture> settableFuture) { - LOG.debug("Starting write-transactions."); + Simple(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem, + final WriteTransactionsInput input) { + super(idListItem, input); + this.dataBroker = Preconditions.checkNotNull(dataBroker); + } - if (input.isChainedTransactions()) { - txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor); - random = new BasicProvider(); - } else { - txProvider = new DataBrokerBackedProvider(domDataBroker); - random = new NonConflictingProvider(); + @Override + DOMDataWriteTransaction createTransaction() { + return dataBroker.newWriteOnlyTransaction(); } - if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) { - stopwatch = Stopwatch.createStarted(); - completionFuture = settableFuture; - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS); - } else { - executor.shutdown(); + @Override + int nextInt(final int bound) { + int nextInt; + do { + nextInt = random.nextInt(bound); + } while (previousNumbers.contains(nextInt)); + + if (previousNumbers.size() > 100000) { + previousNumbers.iterator().remove(); + } + previousNumbers.add(nextInt); + + return nextInt; } } - private boolean ensureListExists(final SettableFuture> settableFuture) { + private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); + + final SettableFuture> completionFuture = SettableFuture.create(); + private final Set usedValues = new HashSet<>(); + private final YangInstanceIdentifier idListItem; + + private long insertTx = 0; + private long deleteTx = 0; + + WriteTransactionsHandler(final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) { + super(input); + this.idListItem = Preconditions.checkNotNull(idListItem); + } + + public static ListenableFuture> start(final DOMDataBroker domDataBroker, + final WriteTransactionsInput input) { + LOG.debug("Starting write-transactions."); + + final String id = input.getId(); + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) + .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) + .build(); + final YangInstanceIdentifier idListItem = ID_INT_YID.node(entry.getIdentifier()); final ContainerNode containerNode = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(new NodeIdentifier(ID_INTS)) .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build()) .build(); - DOMDataWriteTransaction tx = txProvider.createTransaction(); + DOMDataWriteTransaction tx = domDataBroker.newWriteOnlyTransaction(); // write only the top list tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode); try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final OptimisticLockFailedException e) { // when multiple write-transactions are executed concurrently we need to ignore this. // If we get optimistic lock here it means id-ints already exists and we can continue. LOG.debug("Got an optimistic lock when writing initial top level list element.", e); } catch (final TransactionCommitFailedException | TimeoutException e) { LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - settableFuture.set(RpcResultBuilder.failed() + return Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; } - final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) - .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) - .build(); - - idListItem = ID_INT_YID.node(entry.getIdentifier()); - tx = txProvider.createTransaction(); + tx = domDataBroker.newWriteOnlyTransaction(); tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry); try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); - return true; + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - settableFuture.set(RpcResultBuilder.failed() + return Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; } - } - private boolean fillInitialList(final SettableFuture> settableFuture) { LOG.debug("Filling the item list with initial values."); final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); final YangInstanceIdentifier itemListId = idListItem.node(ITEM); - final DOMDataWriteTransaction tx = txProvider.createTransaction(); + tx = domDataBroker.newWriteOnlyTransaction(); tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build()); try { - tx.submit().checkedGet(125, TimeUnit.SECONDS); - return true; + tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unable to fill the initial item list.", e); - settableFuture.set(RpcResultBuilder.failed() + return Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; } + + final WriteTransactionsHandler handler; + if (input.isChainedTransactions()) { + handler = new Chained(domDataBroker, idListItem, input); + } else { + handler = new Simple(domDataBroker, idListItem, input); + } + + handler.doStart(); + return handler.completionFuture; } - private ListenableFuture execWrite(final int offset) { - final int i = random.nextInt(MAX_ITEM + 1); + @Override + ListenableFuture execWrite(final long txId) { + final int i = nextInt(MAX_ITEM + 1); final YangInstanceIdentifier entryId = idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); - final DOMDataWriteTransaction tx = txProvider.createTransaction(); - allTx++; + final DOMDataWriteTransaction tx = createTransaction(); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); @@ -217,181 +216,35 @@ public class WriteTransactionsHandler implements Runnable { usedValues.add(i); } - final ListenableFuture future = tx.submit(); - if (LOG.isDebugEnabled()) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - LOG.debug("Future #{} completed successfully", offset); - } - - @Override - public void onFailure(final Throwable cause) { - LOG.debug("Future #{} failed", offset, cause); - } - }); - } - - return future; - } - - private void maybeFinish() { - final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - if (elapsed >= runtimeNanos) { - LOG.debug("Reached max running time, waiting for futures to complete."); - scheduledFuture.cancel(false); - - final ListenableFuture> allFutures = Futures.allAsList(futures); - - try { - // Timeout from cds should be 2 minutes so leave some leeway. - allFutures.get(125, TimeUnit.SECONDS); - - LOG.debug("All futures completed successfully."); - - final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); - - executor.shutdown(); - } catch (final ExecutionException e) { - LOG.error("Write transactions failed.", e.getCause()); - - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build()); - } catch (InterruptedException | TimeoutException e) { - LOG.error("Write transactions failed.", e); - - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, - "Final submit was timed out by the test provider or was interrupted", e).build()); - - for (int i = 0; i < futures.size(); i++) { - final ListenableFuture future = futures.get(i); - - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (final TimeoutException fe) { - LOG.warn("Future #{}/{} not completed yet", i, futures.size()); - } catch (final ExecutionException fe) { - LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause()); - } catch (final InterruptedException fe) { - LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e); - } - } - } catch (Exception exception) { - LOG.error("Write transactions failed.", exception); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build()); - - executor.shutdown(); - } - } - } - - private interface RandomnessProvider { - int nextInt(int bound); - } - - private static class NonConflictingProvider implements RandomnessProvider { - - private final SplittableRandom random = new SplittableRandom(); - private final LinkedHashSet previousNumbers = new LinkedHashSet<>(); - - @Override - public int nextInt(int bound) { - int nextInt; - do { - nextInt = random.nextInt(bound); - } while (previousNumbers.contains(nextInt)); - - if (previousNumbers.size() > 100000) { - previousNumbers.iterator().remove(); - } - previousNumbers.add(nextInt); - - return nextInt; - } + return tx.submit(); } - private static class BasicProvider implements RandomnessProvider { - - private final SplittableRandom random = new SplittableRandom(); - - @Override - public int nextInt(int bound) { - return random.nextInt(bound); - } - } - - private interface TxProvider { - - DOMDataWriteTransaction createTransaction(); + @Override + void runFailed(final Throwable cause) { + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build()); } - private static class TxChainBackedProvider implements TxProvider { - - private final DOMTransactionChain transactionChain; - - TxChainBackedProvider(final DOMDataBroker dataBroker, - final SettableFuture> completionFuture, - final ScheduledExecutorService executor) { - - transactionChain = - dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor)); - } + @Override + void runSuccessful(final long allTx) { + final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); - @Override - public DOMDataWriteTransaction createTransaction() { - return transactionChain.newWriteOnlyTransaction(); - } + completionFuture.set(RpcResultBuilder.success() + .withResult(output).build()); } - private static class DataBrokerBackedProvider implements TxProvider { - - private final DOMDataBroker dataBroker; - - DataBrokerBackedProvider(final DOMDataBroker dataBroker) { - this.dataBroker = dataBroker; - } - - @Override - public DOMDataWriteTransaction createTransaction() { - return dataBroker.newWriteOnlyTransaction(); - } + @Override + void runTimedOut(final Exception cause) { + completionFuture.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, + "Final submit was timed out by the test provider or was interrupted", cause).build()); } - private static class TestChainListener implements TransactionChainListener { - - private final SettableFuture> resultFuture; - private final ScheduledExecutorService executor; - - TestChainListener(final SettableFuture> resultFuture, - final ScheduledExecutorService executor) { - - this.resultFuture = resultFuture; - this.executor = executor; - } - - @Override - public void onTransactionChainFailed(final TransactionChain chain, - final AsyncTransaction transaction, - final Throwable cause) { - LOG.warn("Transaction chain failed.", cause); - resultFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); - - executor.shutdown(); - } + abstract DOMDataWriteTransaction createTransaction(); - @Override - public void onTransactionChainSuccessful(final TransactionChain chain) { - LOG.debug("Transaction chain closed successfully."); - } - } + abstract int nextInt(int bound); } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/YnlListener.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/YnlListener.java index 38e0596a07..c6744a9393 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/YnlListener.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/YnlListener.java @@ -23,10 +23,10 @@ public class YnlListener implements OdlMdsalLowlevelTargetListener { private final String id; - private AtomicLong localNumber = new AtomicLong(); - private AtomicLong allNot = new AtomicLong(); - private AtomicLong idNot = new AtomicLong(); - private AtomicLong errNot = new AtomicLong(); + private final AtomicLong localNumber = new AtomicLong(); + private final AtomicLong allNot = new AtomicLong(); + private final AtomicLong idNot = new AtomicLong(); + private final AtomicLong errNot = new AtomicLong(); public YnlListener(final String id) { Preconditions.checkNotNull(id); @@ -42,14 +42,13 @@ public class YnlListener implements OdlMdsalLowlevelTargetListener { if (notification.getId().equals(id)) { idNot.incrementAndGet(); - localNumber.getAndUpdate((value -> { + localNumber.getAndUpdate(value -> { if (notification.getSequenceNumber() - value == 1) { return value + 1; - } else { - errNot.getAndIncrement(); - return value; } - })); + errNot.getAndIncrement(); + return value; + }); } } -- 2.36.6