X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FWriteTransactionsHandler.java;h=b0503b240611176458732b95194681d6e598d8dc;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hp=797e252a914a6cdcd0a3811e1dcae9a4c12a2c6b;hpb=7d53e7984cfbd69fdf78ad07112974aa3e56fc55;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java index 797e252a91..b0503b2406 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -5,19 +5,20 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; +import static java.util.Objects.requireNonNull; + import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; import java.util.SplittableRandom; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; @@ -68,9 +69,9 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle @Override public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { - LOG.warn("Transaction chain failed.", cause); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); + // This is expected to happen frequently in isolation testing. + LOG.debug("Transaction chain failed.", cause); + // Do not return RPC here, rely on transaction failure to call runFailed. } @Override @@ -87,7 +88,7 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle Simple(final DOMDataBroker dataBroker, final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) { super(idListItem, input); - this.dataBroker = Preconditions.checkNotNull(dataBroker); + this.dataBroker = requireNonNull(dataBroker); } @Override @@ -114,20 +115,20 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class); final SettableFuture> completionFuture = SettableFuture.create(); - private final Set usedValues = new HashSet<>(); + private final Set usedValues = ConcurrentHashMap.newKeySet(); private final YangInstanceIdentifier idListItem; - private long insertTx = 0; - private long deleteTx = 0; + private final AtomicLong insertTx = new AtomicLong(); + private final AtomicLong deleteTx = new AtomicLong(); WriteTransactionsHandler(final YangInstanceIdentifier idListItem, final WriteTransactionsInput input) { super(input); - this.idListItem = Preconditions.checkNotNull(idListItem); + this.idListItem = requireNonNull(idListItem); } public static ListenableFuture> start(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) { - LOG.debug("Starting write-transactions."); + LOG.info("Starting write transactions with input {}", input); final String id = input.getId(); final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) @@ -150,20 +151,22 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle // If we get optimistic lock here it means id-ints already exists and we can continue. LOG.debug("Got an optimistic lock when writing initial top level list element.", e); } catch (final TransactionCommitFailedException | TimeoutException e) { - LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - return Futures.immediateFuture(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + LOG.error("Error writing top-level path {}: {}", ID_INTS_YID, containerNode, e); + return RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Could not start write transactions - error writing top-level path %s: %s", + ID_INTS_YID, containerNode), e).buildFuture(); } tx = domDataBroker.newWriteOnlyTransaction(); tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry); try { - tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } catch (final Exception e) { - LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - return Futures.immediateFuture(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + tx.submit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Error writing top-level path {}: {}", idListItem, entry, e); + return RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Could not start write transactions - error writing list entry path %s: %s", + idListItem, entry), e).buildFuture(); } LOG.debug("Filling the item list with initial values."); @@ -172,14 +175,16 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle final YangInstanceIdentifier itemListId = idListItem.node(ITEM); tx = domDataBroker.newWriteOnlyTransaction(); - tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build()); + final MapNode itemListNode = mapBuilder.build(); + tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, itemListNode); try { - tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } catch (final Exception e) { - LOG.warn("Unable to fill the initial item list.", e); - return Futures.immediateFuture(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + tx.submit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Error filling initial item list path {}: {}", itemListId, itemListNode, e); + return RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Could not start write transactions - error filling initial item list path %s: %s", + itemListId, itemListNode), e).buildFuture(); } final WriteTransactionsHandler handler; @@ -190,6 +195,8 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle } handler.doStart(); + + LOG.info("Write transactions successfully started"); return handler.completionFuture; } @@ -198,19 +205,19 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle final int i = nextInt(MAX_ITEM + 1); final YangInstanceIdentifier entryId = - idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); + idListItem.node(ITEM).node(YangInstanceIdentifier.NodeIdentifierWithPredicates.of(ITEM, NUMBER, i)); final DOMDataWriteTransaction tx = createTransaction(); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); - deleteTx++; + deleteTx.incrementAndGet(); tx.delete(LogicalDatastoreType.CONFIGURATION, entryId); usedValues.remove(i); } else { LOG.debug("Inserting item: {}", i); - insertTx++; + insertTx.incrementAndGet(); final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry); usedValues.add(i); @@ -220,17 +227,17 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle } @Override - void runFailed(final Throwable cause) { + void runFailed(final Throwable cause, final long txId) { completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build()); + .withError(RpcError.ErrorType.APPLICATION, "Commit failed for tx # " + txId, cause).build()); } @Override void runSuccessful(final long allTx) { final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder() .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) + .setInsertTx(insertTx.get()) + .setDeleteTx(deleteTx.get()) .build(); completionFuture.set(RpcResultBuilder.success() @@ -238,10 +245,9 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle } @Override - void runTimedOut(final Exception cause) { + void runTimedOut(final String cause) { completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, - "Final submit was timed out by the test provider or was interrupted", cause).build()); + .withError(RpcError.ErrorType.APPLICATION, cause).build()); } abstract DOMDataWriteTransaction createTransaction();