}
}
+ grouping transactions-params {
+ leaf seconds {
+ description "This RPC has to work (roughly) this long.";
+ mandatory true;
+ type uint32;
+ }
+ leaf transactions-per-second {
+ description "An upper limit of transactions per second this RPC shall try to achieve.";
+ mandatory true;
+ type uint32;
+ }
+ }
+
+ grouping transactions-result {
+ leaf all-tx {
+ description "Number of all transactions executed.";
+ type int64;
+ mandatory true;
+ }
+ leaf insert-tx {
+ description "Number of transactions that inserted data.";
+ type int64;
+ mandatory true;
+ }
+ leaf delete-tx {
+ description "Number of transactions that deleted data.";
+ type int64;
+ mandatory true;
+ }
+ }
+
rpc write-transactions {
description "Upon receiving this, the member shall make sure the outer list item
of llt:id-ints exists for the given id, and then start creating (one by one)
OptimisticLockException is always considered an error.";
input {
uses llc:id-grouping;
- leaf seconds {
- description "This RPC has to work (roughly) this long.";
- mandatory true;
- type uint32;
- }
- leaf transactions-per-second {
- description "An upper limit of transactions per second this RPC shall try to achieve.";
- mandatory true;
- type uint32;
- }
+ uses transactions-params;
leaf chained-transactions {
description "If true, write transactions shall be created on a transaction chain,
(created at start of the RPC call, and deleted at at its end).
}
}
output {
- leaf all-tx {
- description "Number of all transactions executed.";
- type int64;
- mandatory true;
- }
- leaf insert-tx {
- description "Number of transactions that inserted data.";
- type int64;
- mandatory true;
- }
- leaf delete-tx {
- description "Number of transactions that deleted data.";
- type int64;
- mandatory true;
- }
+ uses transactions-result;
}
}
but the shard and the whole id item shall be kept as they are.";
input {
uses llc:id-grouping;
- leaf seconds {
- description "This RPC has to work (roughly) this long.";
- mandatory true;
- type uint32;
- }
- leaf transactions-per-second {
- description "An upper limit of transactions per second this RPC shall try to achieve.";
- mandatory true;
- type uint32;
- }
+ uses transactions-params;
leaf isolated-transactions {
description "The value for DOMDataTreeProducer#createTransaction argument.";
mandatory true;
}
}
output {
- leaf all-tx {
- description "Number of all transactions executed.";
- type int64;
- mandatory true;
- }
- leaf insert-tx {
- description "Number of transactions that inserted data.";
- type int64;
- mandatory true;
- }
- leaf delete-tx {
- description "Number of transactions that deleted data.";
- type int64;
- mandatory true;
- }
+ uses transactions-result;
}
}
private static final Logger LOG = LoggerFactory.getLogger(CarDataChangeListener.class);
@Override
- public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+ public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
if (LOG.isTraceEnabled()) {
LOG.trace("onDataChanged invoked");
outputChanges(change);
}
}
- private void outputChanges(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+ private static void outputChanges(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
final Map<InstanceIdentifier<?>, DataObject> originalData = change.getOriginalData() != null ?
change.getOriginalData(): Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
final Map<InstanceIdentifier<?>, DataObject> updatedData = change.getUpdatedData() != null ?
package org.opendaylight.controller.clustering.it.provider;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
-import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE;
-
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
private static final Logger LOG = LoggerFactory.getLogger(CarDataTreeChangeListener.class);
@java.lang.Override
- public void onDataTreeChanged(@Nonnull java.util.Collection<DataTreeModification<Cars>> changes) {
+ public void onDataTreeChanged(@Nonnull final java.util.Collection<DataTreeModification<Cars>> changes) {
if (LOG.isTraceEnabled()) {
for (DataTreeModification<Cars> change : changes) {
ouputChanges(change);
}
}
- private void ouputChanges(final DataTreeModification<Cars> change) {
+ private static void ouputChanges(final DataTreeModification<Cars> change) {
final DataObjectModification<Cars> rootNode = change.getRootNode();
final ModificationType modificationType = rootNode.getModificationType();
final InstanceIdentifier<Cars> rootIdentifier = change.getRootPath().getRootIdentifier();
private static final QName YEAR_QNAME = QName.create(Cars.QNAME, "year").intern();
private static final NodeIdentifier YEAR_NODE_ID = new NodeIdentifier(YEAR_QNAME);
- @SuppressWarnings("unchecked")
@Override
- public CheckedFuture<PostCanCommitStep, DataValidationFailedException> canCommit(Object txId,
- DOMDataTreeCandidate candidate, SchemaContext ctx) {
+ public CheckedFuture<PostCanCommitStep, DataValidationFailedException> canCommit(final Object txId,
+ final DOMDataTreeCandidate candidate, final SchemaContext ctx) {
// Simple data validation - verify the year, if present, is >= 1990
// Return the noop PostCanCommitStep as we're only validating input data and not participating in the
// remaining 3PC stages (pre-commit and commit).
- return (CheckedFuture<PostCanCommitStep, DataValidationFailedException>) PostCanCommitStep.NOOP_SUCCESS_FUTURE;
+ return PostCanCommitStep.NOOP_SUCCESS_FUTURE;
}
}
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
private static final Logger LOG_CAR_PROVIDER = LoggerFactory.getLogger(CarProvider.class);
private static final String ENTITY_TYPE = "cars";
- private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build();
+ private static final InstanceIdentifier<Cars> CARS_IID = InstanceIdentifier.builder(Cars.class).build();
private static final DataTreeIdentifier<Cars> CARS_DTID = new DataTreeIdentifier<>(
LogicalDatastoreType.CONFIGURATION, CARS_IID);
private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener();
private final AtomicBoolean registeredListener = new AtomicBoolean();
- private final Collection<ListenerRegistration<DataChangeListener>> carsDclRegistrations =
+ private final Collection<ListenerRegistration<?>> carsDclRegistrations =
Sets.newConcurrentHashSet();
private final Collection<ListenerRegistration<CarDataTreeChangeListener>> carsDtclRegistrations =
Sets.newConcurrentHashSet();
private final AtomicReference<DOMDataTreeCommitCohortRegistration<CarEntryDataTreeCommitCohort>> commitCohortReg =
new AtomicReference<>();
- public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService,
- DOMDataBroker domDataBroker) {
+ public CarProvider(final DataBroker dataProvider, final EntityOwnershipService ownershipService,
+ final DOMDataBroker domDataBroker) {
this.dataProvider = dataProvider;
this.ownershipService = ownershipService;
this.domDataBroker = domDataBroker;
}
@Override
- public Future<RpcResult<Void>> stressTest(StressTestInput input) {
+ public Future<RpcResult<Void>> stressTest(final StressTestInput input) {
final int inputRate;
final long inputCount;
return Futures.immediateFuture(RpcResultBuilder.<Void>failed()
.withError(ErrorType.PROTOCOL, "invalid rate")
.build());
- } else {
- inputRate = input.getRate();
}
+ inputRate = input.getRate();
if (input.getCount() != null) {
inputCount = input.getCount();
} else {
@Override
- public Future<RpcResult<Void>> registerOwnership(RegisterOwnershipInput input) {
+ public Future<RpcResult<Void>> registerOwnership(final RegisterOwnershipInput input) {
if(registeredListener.compareAndSet(false, true)) {
ownershipService.registerListener(ENTITY_TYPE, ownershipListener);
}
}
@Override
- public Future<RpcResult<Void>> unregisterOwnership(UnregisterOwnershipInput input) {
+ public Future<RpcResult<Void>> unregisterOwnership(final UnregisterOwnershipInput input) {
return RpcResultBuilder.<Void>success().buildFuture();
}
private static class CarEntityOwnershipListener implements EntityOwnershipListener {
@Override
- public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+ public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
LOG_CAR_PROVIDER.info("ownershipChanged: {}", ownershipChange);
}
}
LOG_CAR_PROVIDER.info("Unregistering the CarDataChangeListener(s)");
synchronized (carsDclRegistrations) {
int numListeners = 0;
- for (ListenerRegistration<DataChangeListener> carsDclRegistration : carsDclRegistrations) {
+ for (ListenerRegistration<?> carsDclRegistration : carsDclRegistrations) {
carsDclRegistration.close();
numListeners++;
}
private final DOMDataTreeChangeService domDataTreeChangeService;
private final ActorSystem actorSystem;
- private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
+ private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
new HashMap<>();
- private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+ private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
private FlappingSingletonService flappingSingletonService;
private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
private IdIntsListener idIntsListener;
- private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+ private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
private IdIntsDOMDataTreeLIstener idIntsDdtl;
@Override
public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
LOG.debug("write-transactions, input: {}", input);
-
- final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
- final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
- writeTransactionsHandler.start(settableFuture);
-
- return settableFuture;
+ return WriteTransactionsHandler.start(domDataBroker, input);
}
@Override
}
@Override
- public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+ public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
return null;
}
}
@Override
- public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+ public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
return null;
}
}
@Override
- public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+ public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
return null;
}
@Override
public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
LOG.debug("producer-transactions, input: {}", input);
-
- final ProduceTransactionsHandler handler =
- new ProduceTransactionsHandler(domDataTreeService, input);
-
- final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
- handler.start(settableFuture);
-
- return settableFuture;
+ return ProduceTransactionsHandler.start(domDataTreeService, input);
}
@Override
public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
- final InstanceIdentifier shardPrefix = input.getPrefix();
+ final InstanceIdentifier<?> shardPrefix = input.getPrefix();
if (shardPrefix == null) {
final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
}
- public void setRpcRegistration(BindingAwareBroker.RoutedRpcRegistration<CarPurchaseService> rpcRegistration) {
+ public void setRpcRegistration(final BindingAwareBroker.RoutedRpcRegistration<CarPurchaseService> rpcRegistration) {
this.rpcRegistration = rpcRegistration;
}
@Override
- public Future<RpcResult<Void>> addPerson(AddPersonInput input) {
+ public Future<RpcResult<Void>> addPerson(final AddPersonInput input) {
LOG.info("RPC addPerson : adding person [{}]", input);
PersonBuilder builder = new PersonBuilder(input);
final InstanceIdentifier.InstanceIdentifierBuilder<Person> personIdBuilder =
InstanceIdentifier.<People>builder(People.class)
.child(Person.class, person.getKey());
- final InstanceIdentifier personId = personIdBuilder.build();
+ final InstanceIdentifier<Person> personId = personIdBuilder.build();
// Place entry in data store tree
WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
tx.put(LogicalDatastoreType.CONFIGURATION, personId, person, true);
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractTransactionHandler {
+ private abstract static class Phase {
+ abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
+
+ abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
+ }
+
+ private static final class Running extends Phase {
+ private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
+ private Throwable failure;
+
+ void addFuture(final ListenableFuture<Void> execFuture) {
+ futures.add(execFuture);
+ }
+
+ @Override
+ void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+ futures.remove(execFuture);
+ }
+
+ @Override
+ void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+ futures.remove(execFuture);
+ if (failure != null) {
+ failure = cause;
+ }
+ }
+
+ Optional<Throwable> getFailure() {
+ return Optional.ofNullable(failure);
+ }
+ }
+
+ private final class Collecting extends Phase {
+ private final List<ListenableFuture<Void>> futures;
+ private boolean done;
+
+ Collecting(final Collection<ListenableFuture<Void>> futures) {
+ this.futures = new ArrayList<>(futures);
+ }
+
+ @Override
+ void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+ futures.remove(execFuture);
+ if (futures.isEmpty() && !done) {
+ LOG.debug("All futures completed successfully.");
+ runSuccessful(txCounter);
+ }
+ }
+
+ @Override
+ void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+ futures.remove(execFuture);
+ done = true;
+ runFailed(cause);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
+
+ static final int SECOND_AS_NANO = 1000000000;
+ //2^20 as in the model
+ static final int MAX_ITEM = 1048576;
+
+ static final QName ID_INTS =
+ QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
+ static final QName ID =
+ QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
+ static final QName ITEM =
+ QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
+ static final QName NUMBER =
+ QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
+
+ public static final QName ID_INT =
+ QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
+ public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
+ public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
+
+ static final long INIT_TX_TIMEOUT_SECONDS = 125;
+
+ private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
+
+ private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ private final Stopwatch stopwatch = Stopwatch.createStarted();
+ private final long runtimeNanos;
+ private final long delayNanos;
+
+ private ScheduledFuture<?> scheduledFuture;
+ private long txCounter;
+ @GuardedBy("this")
+ private Phase phase;
+
+ AbstractTransactionHandler(final TransactionsParams params) {
+ runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
+ delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
+ }
+
+ final synchronized void doStart() {
+ phase = new Running();
+ scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+ }
+
+ private void execute() {
+ final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ if (elapsed < runtimeNanos) {
+ // Not completed yet: create a transaction and hook it up
+ final long txId = txCounter++;
+ final ListenableFuture<Void> execFuture = execWrite(txId);
+
+ // Ordering is important: we need to add the future before hooking the callback
+ synchronized (this) {
+ ((Running) phase).addFuture(execFuture);
+ }
+ Futures.addCallback(execFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ txSuccess(execFuture, txId);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ txFailure(execFuture, txId, cause);
+ }
+ });
+ } else {
+ startCollection();
+ }
+ }
+
+ private synchronized void startCollection() {
+ scheduledFuture.cancel(false);
+
+ final Running running = (Running) phase;
+ final Optional<Throwable> failure = running.getFailure();
+ if (failure.isPresent()) {
+ executor.shutdown();
+ runFailed(failure.get());
+ return;
+ }
+
+ LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size());
+ if (running.futures.isEmpty()) {
+ executor.shutdown();
+ runSuccessful(txCounter);
+ return;
+ }
+
+ phase = new Collecting(running.futures);
+ executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ executor.shutdown();
+ }
+
+ final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+ LOG.debug("Future #{} completed successfully", txId);
+ phase.txSuccess(execFuture, txId);
+ }
+
+ final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+ LOG.debug("Future #{} failed", txId, cause);
+ phase.txFailure(execFuture, txId, cause);
+ }
+
+ private synchronized void checkCollection() {
+ final Collecting collecting = (Collecting) phase;
+ if (!collecting.done) {
+ final int size = collecting.futures.size();
+ for (int i = 0; i < size; i++) {
+ final ListenableFuture<Void> future = collecting.futures.get(i);
+
+ try {
+ future.get(0, TimeUnit.NANOSECONDS);
+ } catch (final TimeoutException e) {
+ LOG.warn("Future #{}/{} not completed yet", i, size);
+ } catch (final ExecutionException e) {
+ LOG.warn("Future #{}/{} failed", i, size, e.getCause());
+ } catch (final InterruptedException e) {
+ LOG.warn("Interrupted while examining future #{}/{}", i, size, e);
+ }
+ }
+
+ runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
+ }
+ }
+
+ abstract ListenableFuture<Void> execWrite(final long txId);
+
+ abstract void runFailed(Throwable cause);
+
+ abstract void runSuccessful(long allTx);
+
+ abstract void runTimedOut(Exception cause);
+}
package org.opendaylight.controller.clustering.it.provider.impl;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INT;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INTS;
-import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ITEM;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INT;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INTS;
+import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ITEM;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
final CheckedFuture<Void, TransactionCommitFailedException> ensureFuture = ensureListExists();
Futures.addCallback(ensureFuture, new FutureCallback<Void>() {
@Override
- public void onSuccess(@Nullable Void result) {
+ public void onSuccess(@Nullable final Void result) {
LOG.debug("Initial list write successful.");
future.set(RpcResultBuilder.<Void>success().build());
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("Shard[{}] creation failed:", identifier, throwable);
final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
final CheckedFuture<Void, TransactionCommitFailedException> future = tx.submit();
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
- public void onSuccess(@Nullable Void result) {
+ public void onSuccess(@Nullable final Void result) {
try {
LOG.debug("Closing producer for initial list.");
producer.close();
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
//NOOP handled by the caller of this method.
}
});
package org.opendaylight.controller.clustering.it.provider.impl;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.SplittableRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ProduceTransactionsHandler implements Runnable {
-
+public class ProduceTransactionsHandler extends AbstractTransactionHandler {
private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
- private static final int SECOND_AS_NANO = 1000000000;
- //2^20 as in the model
- private static final int MAX_ITEM = 1048576;
-
- static final QName ID_INTS =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
- public static final QName ID_INT =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
- static final QName ID =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
- static final QName ITEM =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
- private static final QName NUMBER =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern());
-
- public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
- public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- private final List<ListenableFuture<Void>> futures = new ArrayList<>();
- private final Set<Integer> usedValues = new HashSet<>();
+ private final SettableFuture<RpcResult<ProduceTransactionsOutput>> future = SettableFuture.create();
private final SplittableRandom random = new SplittableRandom();
+ private final Set<Integer> usedValues = new HashSet<>();
+ private final DOMDataTreeIdentifier idListItem;
+ private final DOMDataTreeProducer itemProducer;
- private final DOMDataTreeService domDataTreeService;
- private final long runtimeNanos;
- private final long delayNanos;
- private final String id;
-
- private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
- private Stopwatch stopwatch;
-
- private long allTx = 0;
private long insertTx = 0;
private long deleteTx = 0;
- private ScheduledFuture<?> scheduledFuture;
- private DOMDataTreeProducer itemProducer;
- private DOMDataTreeIdentifier idListItem;
-
- public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
- final ProduceTransactionsInput input) {
-
- this.domDataTreeService = domDataTreeService;
- runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
- delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
- id = input.getId();
- }
-
- @Override
- public void run() {
- futures.add(execWrite(futures.size()));
- maybeFinish();
+ private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem,
+ final ProduceTransactionsInput input) {
+ super(input);
+ this.itemProducer = Preconditions.checkNotNull(producer);
+ this.idListItem = Preconditions.checkNotNull(idListItem);
}
- public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
- completionFuture = settableFuture;
-
- if (fillInitialList(completionFuture)) {
- stopwatch = Stopwatch.createStarted();
- scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
- } else {
- executor.shutdown();
- }
- }
-
- private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
- LOG.debug("Filling the item list with initial values.");
+ public static ListenableFuture<RpcResult<ProduceTransactionsOutput>> start(
+ final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) {
+ final String id = input.getId();
+ LOG.debug("Filling the item list {} with initial values.", id);
final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id));
- itemProducer = domDataTreeService.createProducer(
- Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
+ final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer(
+ Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
final DOMDataTreeWriteCursor cursor =
cursor.write(list.getIdentifier(), list);
cursor.close();
- idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- idListWithKey.node(list.getIdentifier()).toOptimized());
-
try {
- tx.submit().checkedGet(125, TimeUnit.SECONDS);
- return true;
+ tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final Exception e) {
LOG.warn("Unable to fill the initial item list.", e);
- settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
- }
- try {
- itemProducer.close();
- } catch (final DOMDataTreeProducerException exception) {
- LOG.warn("Failure while closing producer.", exception);
+ try {
+ itemProducer.close();
+ } catch (final DOMDataTreeProducerException exception) {
+ LOG.warn("Failure while closing producer.", exception);
+ }
+
+ return Futures.immediateFuture(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
}
- return false;
+
+ final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer,
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier())
+ .toOptimized()), input);
+ handler.doStart();
+ return handler.future;
}
- private ListenableFuture<Void> execWrite(final int offset) {
+ @Override
+ ListenableFuture<Void> execWrite(final long txId) {
final int i = random.nextInt(MAX_ITEM + 1);
final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem);
- allTx++;
-
final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i);
if (usedValues.contains(i)) {
LOG.debug("Deleting item: {}", i);
cursor.close();
- final ListenableFuture<Void> future = tx.submit();
- if (LOG.isDebugEnabled()) {
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- LOG.debug("Future #{} completed successfully", offset);
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- LOG.debug("Future #{} failed", offset, cause);
- }
- });
- }
-
- return future;
+ return tx.submit();
}
- private void maybeFinish() {
- final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- if (elapsed >= runtimeNanos) {
- LOG.debug("Reached max running time, waiting for futures to complete.");
- scheduledFuture.cancel(false);
-
- final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
- try {
- // Timeout from cds should be 2 minutes so leave some leeway.
- allFutures.get(125, TimeUnit.SECONDS);
-
- LOG.debug("All futures completed successfully.");
-
- final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
- .setAllTx(allTx)
- .setInsertTx(insertTx)
- .setDeleteTx(deleteTx)
- .build();
- completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
- .withResult(output).build());
- } catch (ExecutionException e) {
- LOG.error("Write transactions failed.", e.getCause());
- completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
- } catch (InterruptedException | TimeoutException e) {
- LOG.error("Write transactions failed.", e);
- completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION,
- "Final submit was timed out by the test provider or was interrupted", e).build());
-
- for (int i = 0; i < futures.size(); i++) {
- final ListenableFuture<Void> future = futures.get(i);
+ @Override
+ void runFailed(final Throwable cause) {
+ future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
+ }
- try {
- future.get(0, TimeUnit.NANOSECONDS);
- } catch (TimeoutException fe) {
- LOG.warn("Future #{}/{} not completed yet", i, futures.size());
- } catch (ExecutionException fe) {
- LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
- } catch (InterruptedException fe) {
- LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
- }
- }
- } catch (Exception e) {
- LOG.error("Write transactions failed.", e);
- completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
- }
+ @Override
+ void runSuccessful(final long allTx) {
+ final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
+ .setAllTx(allTx)
+ .setInsertTx(insertTx)
+ .setDeleteTx(deleteTx)
+ .build();
+ future.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
+ .withResult(output).build());
+ }
- executor.shutdown();
- try {
- itemProducer.close();
- } catch (final DOMDataTreeProducerException e) {
- LOG.warn("Failure while closing item producer.", e);
- }
- }
+ @Override
+ void runTimedOut(final Exception cause) {
+ future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION,
+ "Final submit was timed out by the test provider or was interrupted", cause).build());
}
}
import com.google.common.util.concurrent.Futures;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
package org.opendaylight.controller.clustering.it.provider.impl;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Set;
import java.util.SplittableRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class WriteTransactionsHandler implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
- private static final int SECOND_AS_NANO = 1000000000;
- //2^20 as in the model
- private static final int MAX_ITEM = 1048576;
-
- private static final QName ID_INTS =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
- private static final QName ID_INT =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
- private static final QName ID =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
- private static final QName ITEM =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
- private static final QName NUMBER =
- QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
-
- public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
- public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
-
- private final WriteTransactionsInput input;
-
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- private final List<ListenableFuture<Void>> futures = new ArrayList<>();
- private final Set<Integer> usedValues = new HashSet<>();
-
- private RandomnessProvider random;
- private TxProvider txProvider;
+public abstract class WriteTransactionsHandler extends AbstractTransactionHandler {
+ private static final class Chained extends WriteTransactionsHandler implements TransactionChainListener {
+ private final SplittableRandom random = new SplittableRandom();
+ private final DOMTransactionChain transactionChain;
- private final DOMDataBroker domDataBroker;
- private final Long runtimeNanos;
- private final Long delayNanos;
- private final String id;
+ Chained(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem,
+ final WriteTransactionsInput input) {
+ super(idListItem, input);
+ transactionChain = dataBroker.createTransactionChain(this);
+ }
- private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
- private Stopwatch stopwatch;
+ @Override
+ DOMDataWriteTransaction createTransaction() {
+ return transactionChain.newWriteOnlyTransaction();
+ }
- private long allTx = 0;
- private long insertTx = 0;
- private long deleteTx = 0;
- private ScheduledFuture<?> scheduledFuture;
- private YangInstanceIdentifier idListItem;
+ @Override
+ int nextInt(final int bound) {
+ return random.nextInt(bound);
+ }
- public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
- this.domDataBroker = domDataBroker;
- this.input = input;
+ @Override
+ public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ LOG.warn("Transaction chain failed.", cause);
+ completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
+ }
- runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
- delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
- id = input.getId();
+ @Override
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+ LOG.debug("Transaction chain closed successfully.");
+ }
}
- @Override
- public void run() {
- futures.add(execWrite(futures.size()));
- maybeFinish();
- }
+ private static final class Simple extends WriteTransactionsHandler {
+ private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
+ private final SplittableRandom random = new SplittableRandom();
+ private final DOMDataBroker dataBroker;
- public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
- LOG.debug("Starting write-transactions.");
+ Simple(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem,
+ final WriteTransactionsInput input) {
+ super(idListItem, input);
+ this.dataBroker = Preconditions.checkNotNull(dataBroker);
+ }
- if (input.isChainedTransactions()) {
- txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
- random = new BasicProvider();
- } else {
- txProvider = new DataBrokerBackedProvider(domDataBroker);
- random = new NonConflictingProvider();
+ @Override
+ DOMDataWriteTransaction createTransaction() {
+ return dataBroker.newWriteOnlyTransaction();
}
- if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
- stopwatch = Stopwatch.createStarted();
- completionFuture = settableFuture;
- scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
- } else {
- executor.shutdown();
+ @Override
+ int nextInt(final int bound) {
+ int nextInt;
+ do {
+ nextInt = random.nextInt(bound);
+ } while (previousNumbers.contains(nextInt));
+
+ if (previousNumbers.size() > 100000) {
+ previousNumbers.iterator().remove();
+ }
+ previousNumbers.add(nextInt);
+
+ return nextInt;
}
}
- private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
+
+ final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture = SettableFuture.create();
+ private final Set<Integer> usedValues = new HashSet<>();
+ private final YangInstanceIdentifier idListItem;
+
+ private long insertTx = 0;
+ private long deleteTx = 0;
+
+ WriteTransactionsHandler(final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) {
+ super(input);
+ this.idListItem = Preconditions.checkNotNull(idListItem);
+ }
+
+ public static ListenableFuture<RpcResult<WriteTransactionsOutput>> start(final DOMDataBroker domDataBroker,
+ final WriteTransactionsInput input) {
+ LOG.debug("Starting write-transactions.");
+
+ final String id = input.getId();
+ final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
+ .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
+ .build();
+ final YangInstanceIdentifier idListItem = ID_INT_YID.node(entry.getIdentifier());
final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
.withNodeIdentifier(new NodeIdentifier(ID_INTS))
.withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
.build();
- DOMDataWriteTransaction tx = txProvider.createTransaction();
+ DOMDataWriteTransaction tx = domDataBroker.newWriteOnlyTransaction();
// write only the top list
tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
try {
- tx.submit().checkedGet(125, TimeUnit.SECONDS);
+ tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final OptimisticLockFailedException e) {
// when multiple write-transactions are executed concurrently we need to ignore this.
// If we get optimistic lock here it means id-ints already exists and we can continue.
LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
} catch (final TransactionCommitFailedException | TimeoutException e) {
LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
- settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+ return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
.withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
- return false;
}
- final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
- .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
- .build();
-
- idListItem = ID_INT_YID.node(entry.getIdentifier());
- tx = txProvider.createTransaction();
+ tx = domDataBroker.newWriteOnlyTransaction();
tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry);
try {
- tx.submit().checkedGet(125, TimeUnit.SECONDS);
- return true;
+ tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final Exception e) {
LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
- settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+ return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
.withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
- return false;
}
- }
- private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
LOG.debug("Filling the item list with initial values.");
final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
final YangInstanceIdentifier itemListId = idListItem.node(ITEM);
- final DOMDataWriteTransaction tx = txProvider.createTransaction();
+ tx = domDataBroker.newWriteOnlyTransaction();
tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
try {
- tx.submit().checkedGet(125, TimeUnit.SECONDS);
- return true;
+ tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final Exception e) {
LOG.warn("Unable to fill the initial item list.", e);
- settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+ return Futures.immediateFuture(RpcResultBuilder.<WriteTransactionsOutput>failed()
.withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
- return false;
}
+
+ final WriteTransactionsHandler handler;
+ if (input.isChainedTransactions()) {
+ handler = new Chained(domDataBroker, idListItem, input);
+ } else {
+ handler = new Simple(domDataBroker, idListItem, input);
+ }
+
+ handler.doStart();
+ return handler.completionFuture;
}
- private ListenableFuture<Void> execWrite(final int offset) {
- final int i = random.nextInt(MAX_ITEM + 1);
+ @Override
+ ListenableFuture<Void> execWrite(final long txId) {
+ final int i = nextInt(MAX_ITEM + 1);
final YangInstanceIdentifier entryId =
idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
- final DOMDataWriteTransaction tx = txProvider.createTransaction();
- allTx++;
+ final DOMDataWriteTransaction tx = createTransaction();
if (usedValues.contains(i)) {
LOG.debug("Deleting item: {}", i);
usedValues.add(i);
}
- final ListenableFuture<Void> future = tx.submit();
- if (LOG.isDebugEnabled()) {
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- LOG.debug("Future #{} completed successfully", offset);
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- LOG.debug("Future #{} failed", offset, cause);
- }
- });
- }
-
- return future;
- }
-
- private void maybeFinish() {
- final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- if (elapsed >= runtimeNanos) {
- LOG.debug("Reached max running time, waiting for futures to complete.");
- scheduledFuture.cancel(false);
-
- final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
-
- try {
- // Timeout from cds should be 2 minutes so leave some leeway.
- allFutures.get(125, TimeUnit.SECONDS);
-
- LOG.debug("All futures completed successfully.");
-
- final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
- .setAllTx(allTx)
- .setInsertTx(insertTx)
- .setDeleteTx(deleteTx)
- .build();
-
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
- .withResult(output).build());
-
- executor.shutdown();
- } catch (final ExecutionException e) {
- LOG.error("Write transactions failed.", e.getCause());
-
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
- } catch (InterruptedException | TimeoutException e) {
- LOG.error("Write transactions failed.", e);
-
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION,
- "Final submit was timed out by the test provider or was interrupted", e).build());
-
- for (int i = 0; i < futures.size(); i++) {
- final ListenableFuture<Void> future = futures.get(i);
-
- try {
- future.get(0, TimeUnit.NANOSECONDS);
- } catch (final TimeoutException fe) {
- LOG.warn("Future #{}/{} not completed yet", i, futures.size());
- } catch (final ExecutionException fe) {
- LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
- } catch (final InterruptedException fe) {
- LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
- }
- }
- } catch (Exception exception) {
- LOG.error("Write transactions failed.", exception);
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
-
- executor.shutdown();
- }
- }
- }
-
- private interface RandomnessProvider {
- int nextInt(int bound);
- }
-
- private static class NonConflictingProvider implements RandomnessProvider {
-
- private final SplittableRandom random = new SplittableRandom();
- private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
-
- @Override
- public int nextInt(int bound) {
- int nextInt;
- do {
- nextInt = random.nextInt(bound);
- } while (previousNumbers.contains(nextInt));
-
- if (previousNumbers.size() > 100000) {
- previousNumbers.iterator().remove();
- }
- previousNumbers.add(nextInt);
-
- return nextInt;
- }
+ return tx.submit();
}
- private static class BasicProvider implements RandomnessProvider {
-
- private final SplittableRandom random = new SplittableRandom();
-
- @Override
- public int nextInt(int bound) {
- return random.nextInt(bound);
- }
- }
-
- private interface TxProvider {
-
- DOMDataWriteTransaction createTransaction();
+ @Override
+ void runFailed(final Throwable cause) {
+ completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
}
- private static class TxChainBackedProvider implements TxProvider {
-
- private final DOMTransactionChain transactionChain;
-
- TxChainBackedProvider(final DOMDataBroker dataBroker,
- final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
- final ScheduledExecutorService executor) {
-
- transactionChain =
- dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
- }
+ @Override
+ void runSuccessful(final long allTx) {
+ final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
+ .setAllTx(allTx)
+ .setInsertTx(insertTx)
+ .setDeleteTx(deleteTx)
+ .build();
- @Override
- public DOMDataWriteTransaction createTransaction() {
- return transactionChain.newWriteOnlyTransaction();
- }
+ completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
+ .withResult(output).build());
}
- private static class DataBrokerBackedProvider implements TxProvider {
-
- private final DOMDataBroker dataBroker;
-
- DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
- this.dataBroker = dataBroker;
- }
-
- @Override
- public DOMDataWriteTransaction createTransaction() {
- return dataBroker.newWriteOnlyTransaction();
- }
+ @Override
+ void runTimedOut(final Exception cause) {
+ completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION,
+ "Final submit was timed out by the test provider or was interrupted", cause).build());
}
- private static class TestChainListener implements TransactionChainListener {
-
- private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
- private final ScheduledExecutorService executor;
-
- TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
- final ScheduledExecutorService executor) {
-
- this.resultFuture = resultFuture;
- this.executor = executor;
- }
-
- @Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
- final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- LOG.warn("Transaction chain failed.", cause);
- resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
-
- executor.shutdown();
- }
+ abstract DOMDataWriteTransaction createTransaction();
- @Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
- LOG.debug("Transaction chain closed successfully.");
- }
- }
+ abstract int nextInt(int bound);
}
private final String id;
- private AtomicLong localNumber = new AtomicLong();
- private AtomicLong allNot = new AtomicLong();
- private AtomicLong idNot = new AtomicLong();
- private AtomicLong errNot = new AtomicLong();
+ private final AtomicLong localNumber = new AtomicLong();
+ private final AtomicLong allNot = new AtomicLong();
+ private final AtomicLong idNot = new AtomicLong();
+ private final AtomicLong errNot = new AtomicLong();
public YnlListener(final String id) {
Preconditions.checkNotNull(id);
if (notification.getId().equals(id)) {
idNot.incrementAndGet();
- localNumber.getAndUpdate((value -> {
+ localNumber.getAndUpdate(value -> {
if (notification.getSequenceNumber() - value == 1) {
return value + 1;
- } else {
- errNot.getAndIncrement();
- return value;
}
- }));
+ errNot.getAndIncrement();
+ return value;
+ });
}
}