Adjust to mdsal CursorAware API changes 79/74479/3
authorTom Pantelis <tompantelis@gmail.com>
Wed, 25 Jul 2018 22:26:30 +0000 (18:26 -0400)
committerRobert Varga <nite@hq.sk>
Thu, 26 Jul 2018 17:26:26 +0000 (17:26 +0000)
Corresponding mdsal patch: https://git.opendaylight.org/gerrit/#/c/74478/

Change-Id: I82041945b1f15c492f77d65c065068a6fab3878a
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/ProduceTransactionsHandler.java

index 21ca10b5c222f9d609c8857fed02cd1a7b009c91..a907724f552e9a386f0454f8c6edf4368d99d8e1 100644 (file)
@@ -172,7 +172,7 @@ public class DistributedShardFrontendTest {
         assertEquals(createInnerMapNode(1), actualInnerListNode);
 
         txCursor.close();
-        tx.submit().checkedGet();
+        tx.commit().get();
 
         verify(commitCohort, times(2)).canCommit();
         verify(commitCohort, times(2)).preCommit();
index d977f9346288fc722708fb146cc48c4f89c08265..a3c04bdd4b6f40dae6b41312d4e3b71001686289 100644 (file)
@@ -285,7 +285,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         cursor.close();
         LOG.warn("Got to pre submit");
 
-        tx.submit().checkedGet();
+        tx.commit().get();
 
         shardRegistration.close().toCompletableFuture().get();
 
index d35199c1ce756e59a531a6e159719c42e7ba9ab6..6eada08acb33945602ca595d7c3836ad72a2fb3d 100644 (file)
@@ -30,8 +30,8 @@ import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.testkit.javadsl.TestKit;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.typesafe.config.ConfigFactory;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -72,7 +72,6 @@ import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.Dist
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
@@ -205,7 +204,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         cursor.write(test.getIdentifier(), test);
         cursor.close();
 
-        tx.submit().checkedGet();
+        tx.commit().get();
     }
 
     @Test
@@ -235,7 +234,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         cursor.close();
         LOG.debug("Got to pre submit");
 
-        tx.submit().checkedGet();
+        tx.commit().get();
 
         final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
         doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
@@ -322,9 +321,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
         cursor.write(new NodeIdentifier(TestModel.INNER_LIST_QNAME), innerList);
         cursor.close();
-        tx.submit().checkedGet();
+        tx.commit().get();
 
-        final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
+        final ArrayList<ListenableFuture<?>> futures = new ArrayList<>();
         for (int i = 0; i < 1000; i++) {
             final Collection<MapEntryNode> innerListMapEntries = createInnerListMapEntries(1000, "run-" + i);
             for (final MapEntryNode innerListMapEntry : innerListMapEntries) {
@@ -334,11 +333,11 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                                 oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME))));
                 cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry);
                 cursor1.close();
-                futures.add(tx1.submit());
+                futures.add(tx1.commit());
             }
         }
 
-        futures.get(futures.size() - 1).checkedGet();
+        futures.get(futures.size() - 1).get();
 
         final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
         doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
@@ -407,7 +406,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         cursor.write(testNode.getIdentifier(), testNode);
 
         cursor.close();
-        transaction.submit().checkedGet();
+        transaction.commit().get();
 
         final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
         doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
@@ -422,7 +421,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         cursor.write(wholeList.getIdentifier(), wholeList);
         cursor.close();
 
-        transaction.submit().checkedGet();
+        transaction.commit().get();
 
         leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID),
                 true, Collections.emptyList());
index 661db640923e1deaf7d3db291807505bdce963d3..90cdce32dd136737a2d9d230a0f324394d279528 100644 (file)
@@ -69,7 +69,7 @@ abstract class AbstractTransactionHandler {
      * This thread only removes from futures set.
      */
     private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
-    private final Collection<ListenableFuture<Void>> futures = Collections.synchronizedSet(new HashSet<>());
+    private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
     private final long runtimeNanos;
     private final long delayNanos;
@@ -118,14 +118,14 @@ abstract class AbstractTransactionHandler {
 
         // Not completed yet: create a transaction and hook it up
         final long txId = txCounter++;
-        final ListenableFuture<Void> execFuture = execWrite(txId);
+        final ListenableFuture<?> execFuture = execWrite(txId);
         LOG.debug("New future #{} allocated", txId);
 
         // Ordering is important: we need to add the future before hooking the callback
         futures.add(execFuture);
-        Futures.addCallback(execFuture, new FutureCallback<Void>() {
+        Futures.addCallback(execFuture, new FutureCallback<Object>() {
             @Override
-            public void onSuccess(final Void result) {
+            public void onSuccess(final Object result) {
                 txSuccess(execFuture, txId);
             }
 
@@ -157,7 +157,7 @@ abstract class AbstractTransactionHandler {
         return false;
     }
 
-    final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+    final void txSuccess(final ListenableFuture<?> execFuture, final long txId) {
         LOG.debug("Future #{} completed successfully", txId);
         futures.remove(execFuture);
 
@@ -175,7 +175,7 @@ abstract class AbstractTransactionHandler {
         }
     }
 
-    final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+    final void txFailure(final ListenableFuture<?> execFuture, final long txId, final Throwable cause) {
         LOG.debug("Future #{} failed", txId, cause);
         futures.remove(execFuture);
 
@@ -205,7 +205,7 @@ abstract class AbstractTransactionHandler {
         synchronized (futures) {
             int offset = 0;
 
-            for (ListenableFuture<Void> future : futures) {
+            for (ListenableFuture<?> future : futures) {
                 try {
                     future.get(0, TimeUnit.NANOSECONDS);
                 } catch (final TimeoutException e) {
@@ -224,7 +224,7 @@ abstract class AbstractTransactionHandler {
         runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
     }
 
-    abstract ListenableFuture<Void> execWrite(long txId);
+    abstract ListenableFuture<?> execWrite(long txId);
 
     abstract void runFailed(Throwable cause);
 
index efc64a9ab630af9e3c9de701790daa5867df17e4..163d46ab9e6cfae8bd59571534017bc105cfe016 100644 (file)
@@ -94,10 +94,10 @@ public class PrefixShardHandler {
                 LOG.debug("Shard[{}] created successfully.", identifier);
                 registrations.put(identifier, registration);
 
-                final ListenableFuture<Void> ensureFuture = ensureListExists();
-                Futures.addCallback(ensureFuture, new FutureCallback<Void>() {
+                final ListenableFuture<?> ensureFuture = ensureListExists();
+                Futures.addCallback(ensureFuture, new FutureCallback<Object>() {
                     @Override
-                    public void onSuccess(@Nullable final Void result) {
+                    public void onSuccess(@Nullable final Object result) {
                         LOG.debug("Initial list write successful.");
                         future.set(RpcResultBuilder.success(new CreatePrefixShardOutputBuilder().build()).build());
                     }
@@ -160,7 +160,7 @@ public class PrefixShardHandler {
         return future;
     }
 
-    private ListenableFuture<Void> ensureListExists() {
+    private ListenableFuture<?> ensureListExists() {
 
         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ID_INT);
 
@@ -190,10 +190,10 @@ public class PrefixShardHandler {
         cursor.merge(containerNode.getIdentifier(), containerNode);
         cursor.close();
 
-        final ListenableFuture<Void> future = tx.submit();
-        Futures.addCallback(future, new FutureCallback<Void>() {
+        final ListenableFuture<?> future = tx.commit();
+        Futures.addCallback(future, new FutureCallback<Object>() {
             @Override
-            public void onSuccess(@Nullable final Void result) {
+            public void onSuccess(@Nullable final Object result) {
                 try {
                     LOG.debug("Closing producer for initial list.");
                     producer.close();
index 164e81ad00b7a19c82ef21460f0c1a20caa760d8..99e48b2980975c5d281c5c4a23fbcb7a4a6fdb23 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.clustering.it.provider.impl;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -19,6 +20,8 @@ import java.util.SplittableRandom;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
@@ -78,7 +81,7 @@ public final class ProduceTransactionsHandler extends AbstractTransactionHandler
         cursor.close();
 
         try {
-            tx.submit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            tx.commit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             LOG.warn("Unable to fill the initial item list.", e);
             closeProducer(itemProducer);
@@ -104,7 +107,7 @@ public final class ProduceTransactionsHandler extends AbstractTransactionHandler
     }
 
     @Override
-    ListenableFuture<Void> execWrite(final long txId) {
+    FluentFuture<? extends @NonNull CommitInfo> execWrite(final long txId) {
         final int i = random.nextInt(MAX_ITEM + 1);
         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
         final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem);
@@ -128,7 +131,7 @@ public final class ProduceTransactionsHandler extends AbstractTransactionHandler
 
         cursor.close();
 
-        return tx.submit();
+        return tx.commit();
     }
 
     @Override