From 74175c48bb2b3ee786108bdda8e665484080b7f5 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Wed, 3 May 2017 15:21:00 +0200 Subject: [PATCH] Move initial list creation to create-prefix-shard. This move the initial list population of produce-transactions to create-prefix-shard rpc with 3 hardcoded prefixes(prefix-1,prefix-2,prefix-3) so that csit suites can populate the id-int list just once when the shard is created and produce-transactions can now run parallely on multiple entries from multiple nodes. Change-Id: If70990c0e217cd68027ae960a7545c69acf52cdb Signed-off-by: Tomas Cere --- .../provider/MdsalLowLevelTestProvider.java | 3 +- .../it/provider/impl/PrefixShardHandler.java | 96 ++++++++++++++++++- .../impl/ProduceTransactionsHandler.java | 67 +++---------- 3 files changed, 111 insertions(+), 55 deletions(-) diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index 8ebac1c01e..8f67f136ed 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -160,7 +160,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); - prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer); + prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService, + bindingNormalizedNodeSerializer); } @Override diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java index 22e7700d5c..8f711b3337 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java @@ -8,6 +8,13 @@ package org.opendaylight.controller.clustering.it.provider.impl; +import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID; +import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INT; +import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ID_INTS; +import static org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler.ITEM; + +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -16,36 +23,55 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrefixShardHandler { private static final Logger LOG = LoggerFactory.getLogger(PrefixShardHandler.class); + private static final int MAX_PREFIX = 4; + private static final String PREFIX_TEMPLATE = "prefix-"; private final DistributedShardFactory shardFactory; + private final DOMDataTreeService domDataTreeService; private final BindingNormalizedNodeSerializer serializer; private final Map registrations = Collections.synchronizedMap(new HashMap<>()); public PrefixShardHandler(final DistributedShardFactory shardFactory, + final DOMDataTreeService domDataTreeService, final BindingNormalizedNodeSerializer serializer) { this.shardFactory = shardFactory; + this.domDataTreeService = domDataTreeService; this.serializer = serializer; } @@ -64,7 +90,25 @@ public class PrefixShardHandler { completionStage.thenAccept(registration -> { LOG.debug("Shard[{}] created successfully.", identifier); registrations.put(identifier, registration); - future.set(RpcResultBuilder.success().build()); + + final CheckedFuture ensureFuture = ensureListExists(); + Futures.addCallback(ensureFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + LOG.debug("Initial list write successful."); + future.set(RpcResultBuilder.success().build()); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.warn("Shard[{}] creation failed:", identifier, throwable); + + final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed", + "Shard creation failed", "cluster-test-app", "", throwable); + future.set(RpcResultBuilder.failed().withRpcError(error).build()); + } + }); + }); completionStage.exceptionally(throwable -> { LOG.warn("Shard[{}] creation failed:", identifier, throwable); @@ -111,4 +155,54 @@ public class PrefixShardHandler { return future; } + + private CheckedFuture ensureListExists() { + + final CollectionNodeBuilder mapBuilder = ImmutableNodes.mapNodeBuilder(ID_INT); + + // hardcoded initial list population for parallel produce-transactions testing on multiple nodes + for (int i = 1; i < MAX_PREFIX; i++) { + mapBuilder.withChild( + ImmutableNodes.mapEntryBuilder(ID_INT, ID, PREFIX_TEMPLATE + i) + .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) + .build()); + } + final MapNode mapNode = mapBuilder.build(); + + final ContainerNode containerNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(ID_INTS)) + .withChild(mapNode) + .build(); + + final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY))); + + final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false); + + final DOMDataTreeWriteCursor cursor = + tx.createCursor(new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)); + + cursor.merge(containerNode.getIdentifier(), containerNode); + cursor.close(); + + final CheckedFuture future = tx.submit(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + try { + LOG.debug("Closing producer for initial list."); + producer.close(); + } catch (DOMDataTreeProducerException e) { + LOG.warn("Error while closing producer.", e); + } + } + + @Override + public void onFailure(Throwable throwable) { + //NOOP handled by the caller of this method. + } + }); + return future; + } } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java index 917f685769..8aa2dfb03a 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java @@ -58,13 +58,13 @@ public class ProduceTransactionsHandler implements Runnable { //2^20 as in the model private static final int MAX_ITEM = 1048576; - private static final QName ID_INTS = + static final QName ID_INTS = QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints"); - private static final QName ID_INT = + static final QName ID_INT = QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int"); - private static final QName ID = + static final QName ID = QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id"); - private static final QName ITEM = + static final QName ITEM = QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item"); private static final QName NUMBER = QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number"); @@ -90,8 +90,8 @@ public class ProduceTransactionsHandler implements Runnable { private long insertTx = 0; private long deleteTx = 0; private ScheduledFuture scheduledFuture; - private YangInstanceIdentifier idListWithKey; private DOMDataTreeProducer itemProducer; + private YangInstanceIdentifier idListWithKey; public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) { @@ -115,7 +115,7 @@ public class ProduceTransactionsHandler implements Runnable { public void start(final SettableFuture> settableFuture) { completionFuture = settableFuture; - if (ensureListExists(completionFuture) && fillInitialList(completionFuture)) { + if (fillInitialList(completionFuture)) { startTime = System.nanoTime(); scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); } else { @@ -123,53 +123,6 @@ public class ProduceTransactionsHandler implements Runnable { } } - private boolean ensureListExists(final SettableFuture> settableFuture) { - - final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id) - .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build()) - .build(); - final MapNode mapNode = - ImmutableNodes.mapNodeBuilder(ID_INT) - .withChild(entry) - .build(); - - final ContainerNode containerNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(ID_INTS)) - .withChild(mapNode) - .build(); - - final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton( - new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY))); - - final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false); - - final DOMDataTreeWriteCursor cursor = - tx.createCursor(new DOMDataTreeIdentifier( - LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)); - - idListWithKey = ID_INT_YID.node(entry.getIdentifier()); - - cursor.merge(containerNode.getIdentifier(), containerNode); - cursor.close(); - - try { - tx.submit().checkedGet(); - } catch (TransactionCommitFailedException e) { - LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e); - settableFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); - return false; - } finally { - try { - producer.close(); - } catch (DOMDataTreeProducerException e) { - LOG.warn("Error while closing producer.", e); - } - } - - return true; - } - private boolean fillInitialList(final SettableFuture> settableFuture) { LOG.debug("Filling the item list with initial values."); @@ -179,6 +132,8 @@ public class ProduceTransactionsHandler implements Runnable { mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i)); } + idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id)); + itemProducer = domDataTreeService.createProducer( Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey))); @@ -250,6 +205,12 @@ public class ProduceTransactionsHandler implements Runnable { .setDeleteTx(deleteTx) .build(); + try { + itemProducer.close(); + } catch (final DOMDataTreeProducerException e) { + LOG.warn("Failure while closing item producer.", e); + } + completionFuture.set(RpcResultBuilder.success() .withResult(output).build()); -- 2.36.6