X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FProduceTransactionsHandler.java;h=232391209b506d076fe202641a3e4fcdcdf1b004;hp=ee46a74746c56c52451fa091746a9ed1e1b566af;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hpb=1cc64603a97e49d153e16328b3b4a7d0926e086d diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java index ee46a74746..232391209b 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java @@ -5,27 +5,24 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.SplittableRandom; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; +import java.util.concurrent.TimeoutException; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; @@ -35,225 +32,131 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ProduceTransactionsHandler implements Runnable { - +public final class ProduceTransactionsHandler extends AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class); - private static final int SECOND_AS_NANO = 1000000000; - //2^20 as in the model - private static final int MAX_ITEM = 1048576; - - private static final QName ID_INTS = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); - private static final QName ID = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); - private static final QName ITEM = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); - private static final QName NUMBER = - QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); - public static final YangInstanceIdentifier ID_INTS_YID = - YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(ID_INTS)); - - private final DOMDataTreeService domDataTreeService; - - private final long timeToTake; - private final long delay; - private final String id; - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final ArrayList> futures = new ArrayList<>(); - private final Set usedValues = new HashSet<>(); + private final SettableFuture> future = SettableFuture.create(); private final SplittableRandom random = new SplittableRandom(); + private final Set usedValues = new HashSet<>(); + private final DOMDataTreeIdentifier idListItem; + private final DOMDataTreeProducer itemProducer; - private long startTime; - private SettableFuture> completionFuture; - - private long allTx = 0; private long insertTx = 0; private long deleteTx = 0; - private ScheduledFuture scheduledFuture; - private YangInstanceIdentifier idListWithKey; - private DOMDataTreeProducer itemProducer; - - public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService, - final ProduceTransactionsInput input) { - - this.domDataTreeService = domDataTreeService; - - timeToTake = input.getSeconds() * SECOND_AS_NANO; - delay = SECOND_AS_NANO / input.getTransactionsPerSecond(); - id = input.getId(); - } - - @Override - public void run() { - final long current = System.nanoTime(); - - futures.add(execWrite()); - - maybeFinish(current); - } - - public void start(final SettableFuture> settableFuture) { - - if (ensureListExists(completionFuture) && fillInitialList(completionFuture)) { - startTime = System.nanoTime(); - completionFuture = settableFuture; - scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); - } else { - executor.shutdown(); - } - } - - private boolean ensureListExists(final SettableFuture> settableFuture) { - - final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id) - .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) - .build(); - final MapNode mapNode = - ImmutableNodes.mapNodeBuilder(ID_INTS) - .withChild(entry) - .build(); - - final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY))); - - final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false); - - final DOMDataTreeWriteCursor cursor = - tx.createCursor(new DOMDataTreeIdentifier( - LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)); - - idListWithKey = ID_INTS_YID.node(entry.getIdentifier()); - - cursor.merge(mapNode.getIdentifier(), mapNode); - cursor.close(); - try { - tx.submit().checkedGet(); - } catch (TransactionCommitFailedException e) { - LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; - } finally { - try { - producer.close(); - } catch (DOMDataTreeProducerException e) { - LOG.warn("Error while closing producer.", e); - } - } - - return true; + private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem, + final ProduceTransactionsInput input) { + super(input); + this.itemProducer = requireNonNull(producer); + this.idListItem = requireNonNull(idListItem); } - private boolean fillInitialList(final SettableFuture> settableFuture) { - LOG.debug("Filling the item list with initial values."); + public static ListenableFuture> start( + final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) { + final String id = input.getId(); + LOG.debug("Filling the item list {} with initial values.", id); - final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM); - for (int i = 0; i < MAX_ITEM / 2; i++) { - usedValues.add(i); - mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i)); - } + final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(NodeIdentifierWithPredicates.of(ID_INT, ID, id)); - itemProducer = domDataTreeService.createProducer( - Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); + final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer( + Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); final DOMDataTreeWriteCursor cursor = tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)); - final MapNode list = mapBuilder.build(); + final MapNode list = ImmutableNodes.mapNodeBuilder(ITEM).build(); cursor.write(list.getIdentifier(), list); cursor.close(); try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { + tx.commit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.warn("Unable to fill the initial item list.", e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; + closeProducer(itemProducer); + + return Futures.immediateFuture(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); } - return true; + final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer, + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier()) + .toOptimized()), input); + // It is handler's responsibility to close itemProducer when the work is finished. + handler.doStart(); + return handler.future; } - private CheckedFuture execWrite() { - final int i = random.nextInt(MAX_ITEM + 1); - - final YangInstanceIdentifier entryId = - idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i)); + private static void closeProducer(final DOMDataTreeProducer producer) { + try { + producer.close(); + } catch (final DOMDataTreeProducerException exception) { + LOG.warn("Failure while closing producer.", exception); + } + } + @Override + FluentFuture execWrite(final long txId) { + final int i = random.nextInt(MAX_ITEM + 1); final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false); - final DOMDataTreeWriteCursor cursor = tx.createCursor( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM))); - allTx++; + final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem); + final NodeIdentifierWithPredicates entryId = NodeIdentifierWithPredicates.of(ITEM, NUMBER, i); if (usedValues.contains(i)) { LOG.debug("Deleting item: {}", i); deleteTx++; - cursor.delete(entryId.getLastPathArgument()); + cursor.delete(entryId); usedValues.remove(i); } else { LOG.debug("Inserting item: {}", i); insertTx++; - final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i); - cursor.write(entryId.getLastPathArgument(), entry); + + final MapEntryNode entry = ImmutableNodes.mapEntryBuilder().withNodeIdentifier(entryId) + .withChild(ImmutableNodes.leafNode(NUMBER, i)).build(); + cursor.write(entryId, entry); usedValues.add(i); } cursor.close(); - return tx.submit(); + return tx.commit(); } - private void maybeFinish(final long current) { - if ((current - startTime) > timeToTake) { - LOG.debug("Reached max running time, waiting for futures to complete."); - scheduledFuture.cancel(false); - - final ListenableFuture> allFutures = Futures.allAsList(futures); - - Futures.addCallback(allFutures, new FutureCallback>() { - @Override - public void onSuccess(@Nullable final List result) { - LOG.debug("All futures completed successfully."); - - final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() - .setAllTx(allTx) - .setInsertTx(insertTx) - .setDeleteTx(deleteTx) - .build(); - - completionFuture.set(RpcResultBuilder.success() - .withResult(output).build()); - - executor.shutdown(); - } + @Override + void runFailed(final Throwable cause, final long txId) { + closeProducer(itemProducer); + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "Commit failed for tx # " + txId, cause).build()); + } - @Override - public void onFailure(final Throwable t) { - LOG.error("Write transactions failed.", t); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build()); + @Override + void runSuccessful(final long allTx) { + closeProducer(itemProducer); + final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder() + .setAllTx(allTx) + .setInsertTx(insertTx) + .setDeleteTx(deleteTx) + .build(); + future.set(RpcResultBuilder.success() + .withResult(output).build()); + } - executor.shutdown(); - } - }); - } + @Override + void runTimedOut(final String cause) { + closeProducer(itemProducer); + future.set(RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, cause).build()); } }