Do not require ListeningExecutorService 10/106810/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 4 Jul 2023 19:32:17 +0000 (21:32 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 4 Jul 2023 19:46:50 +0000 (21:46 +0200)
SerializedDOMDataBroker can easily operate on any Executor with the
help of Futures.submit(). Drop the requirement to have a
ListeningExecutorService and operate on plain Executor.

Change-Id: I5e5bc1f0c7bfa93f443bc18755d48beafc22f7f0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/SerializedDOMDataBroker.java
dom/mdsal-dom-inmemory-datastore-benchmark/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/benchmark/InMemoryBrokerWriteTransactionBenchmark.java

index 4ed4eb862dfa27f50b27b2d3ea57f4965584e3ae..0ea285710b9d0dfbe56e0691db791b092856e559 100644 (file)
@@ -10,13 +10,17 @@ package org.opendaylight.mdsal.dom.broker;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.broker.CommitCoordinationTask.WithTracker;
 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
@@ -38,37 +42,53 @@ public final class SerializedDOMDataBroker extends AbstractDOMDataBroker {
     private static final Logger LOG = LoggerFactory.getLogger(SerializedDOMDataBroker.class);
 
     private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
-    private final ListeningExecutorService executor;
+    private final Executor executor;
 
     /**
      * Construct DOMDataCommitCoordinator which uses supplied executor to
      * process commit coordinations.
      *
      * @param datastores the Map of backing DOMStore instances
-     * @param executor the ListeningExecutorService to use
+     * @param executor the Executor to use
      */
     public SerializedDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
-            final ListeningExecutorService executor) {
+            final Executor executor) {
         super(datastores);
         this.executor = requireNonNull(executor, "executor must not be null.");
     }
 
+    /**
+     * Construct DOMDataCommitCoordinator which uses supplied executor to process commit coordinations.
+     *
+     * @param datastores the Map of backing DOMStore instances
+     * @param executor the {@link ListeningExecutorService} to use
+     * @deprecated Use {@link #SerializedDOMDataBroker(Map, Executor)} instead.
+     */
+    @Deprecated(since = "12.0.1", forRemoval = true)
+    public SerializedDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
+            final ListeningExecutorService executor) {
+        this(datastores, (Executor) executor);
+    }
+
     public DurationStatisticsTracker getCommitStatsTracker() {
         return commitStatsTracker;
     }
 
     @Override
-    protected FluentFuture<? extends CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
+    protected FluentFuture<CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
             final DOMStoreThreePhaseCommitCohort cohort) {
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
 
+        final ListenableFuture<CommitInfo> future;
         try {
-            return FluentFuture.from(executor.submit(
-                new CommitCoordinationTask.WithTracker(transaction, cohort, commitStatsTracker)));
+            // FIXME: use FluentFutures.submit() once it is available
+            future = Futures.submit(new WithTracker(transaction, cohort, commitStatsTracker), executor);
         } catch (RejectedExecutionException e) {
             LOG.error("The commit executor's queue is full - submit task was rejected. \n{}", executor, e);
             return FluentFutures.immediateFailedFluentFuture(new TransactionCommitFailedException(
                 "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
         }
+
+        return FluentFuture.from(future);
     }
 }
index 516a2552c2f68c73362121364b82273bc4af805e..d6998c419bddd95b9eb937a5a88a9d42bed2dab9 100644 (file)
@@ -8,9 +8,9 @@
 package org.opendaylight.mdsal.dom.store.inmemory.benchmark;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -32,15 +32,14 @@ import org.openjdk.jmh.annotations.State;
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 @Fork(1)
 public class InMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryBrokerWriteTransactionBenchmark {
-    private ListeningExecutorService executor = null;
+    private ExecutorService executor = null;
 
     @Setup(Level.Trial)
     @Override
     public void setUp() throws Exception {
-        ListeningExecutorService dsExec = MoreExecutors.newDirectExecutorService();
-        executor = MoreExecutors.listeningDecorator(
-                MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(1), 1L,
-                        TimeUnit.SECONDS));
+        var dsExec = MoreExecutors.newDirectExecutorService();
+        executor = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(1), 1L,
+                        TimeUnit.SECONDS);
 
         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec);
         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec);