import static java.util.Objects.requireNonNull;
import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.broker.CommitCoordinationTask.WithTracker;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
private static final Logger LOG = LoggerFactory.getLogger(SerializedDOMDataBroker.class);
private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
- private final ListeningExecutorService executor;
+ private final Executor executor;
/**
* Construct DOMDataCommitCoordinator which uses supplied executor to
* process commit coordinations.
*
* @param datastores the Map of backing DOMStore instances
- * @param executor the ListeningExecutorService to use
+ * @param executor the Executor to use
*/
public SerializedDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
- final ListeningExecutorService executor) {
+ final Executor executor) {
super(datastores);
this.executor = requireNonNull(executor, "executor must not be null.");
}
+ /**
+ * Construct DOMDataCommitCoordinator which uses supplied executor to process commit coordinations.
+ *
+ * @param datastores the Map of backing DOMStore instances
+ * @param executor the {@link ListeningExecutorService} to use
+ * @deprecated Use {@link #SerializedDOMDataBroker(Map, Executor)} instead.
+ */
+ @Deprecated(since = "12.0.1", forRemoval = true)
+ public SerializedDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
+ final ListeningExecutorService executor) {
+ this(datastores, (Executor) executor);
+ }
+
public DurationStatisticsTracker getCommitStatsTracker() {
return commitStatsTracker;
}
@Override
- protected FluentFuture<? extends CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
+ protected FluentFuture<CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
final DOMStoreThreePhaseCommitCohort cohort) {
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
+ final ListenableFuture<CommitInfo> future;
try {
- return FluentFuture.from(executor.submit(
- new CommitCoordinationTask.WithTracker(transaction, cohort, commitStatsTracker)));
+ // FIXME: use FluentFutures.submit() once it is available
+ future = Futures.submit(new WithTracker(transaction, cohort, commitStatsTracker), executor);
} catch (RejectedExecutionException e) {
LOG.error("The commit executor's queue is full - submit task was rejected. \n{}", executor, e);
return FluentFutures.immediateFailedFluentFuture(new TransactionCommitFailedException(
"Could not submit the commit task - the commit queue capacity has been exceeded.", e));
}
+
+ return FluentFuture.from(future);
}
}
package org.opendaylight.mdsal.dom.store.inmemory.benchmark;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
public class InMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryBrokerWriteTransactionBenchmark {
- private ListeningExecutorService executor = null;
+ private ExecutorService executor = null;
@Setup(Level.Trial)
@Override
public void setUp() throws Exception {
- ListeningExecutorService dsExec = MoreExecutors.newDirectExecutorService();
- executor = MoreExecutors.listeningDecorator(
- MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(1), 1L,
- TimeUnit.SECONDS));
+ var dsExec = MoreExecutors.newDirectExecutorService();
+ executor = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(1), 1L,
+ TimeUnit.SECONDS);
InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec);
InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec);