import static java.util.Objects.requireNonNull;
import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.broker.CommitCoordinationTask.WithTracker;
+import org.opendaylight.mdsal.dom.spi.AbstractDOMDataBroker;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
private static final Logger LOG = LoggerFactory.getLogger(SerializedDOMDataBroker.class);
private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
- private final ListeningExecutorService executor;
+ private final Executor executor;
/**
* Construct DOMDataCommitCoordinator which uses supplied executor to
* process commit coordinations.
*
* @param datastores the Map of backing DOMStore instances
- * @param executor the ListeningExecutorService to use
+ * @param executor the Executor to use
*/
- public SerializedDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
- final ListeningExecutorService executor) {
+ public SerializedDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, final Executor executor) {
super(datastores);
this.executor = requireNonNull(executor, "executor must not be null.");
}
}
@Override
- protected FluentFuture<? extends CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
+ protected FluentFuture<CommitInfo> commit(final DOMDataTreeWriteTransaction transaction,
final DOMStoreThreePhaseCommitCohort cohort) {
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
+ final ListenableFuture<CommitInfo> future;
try {
- return FluentFuture.from(executor.submit(
- new CommitCoordinationTask.WithTracker(transaction, cohort, commitStatsTracker)));
+ // FIXME: use FluentFutures.submit() once it is available
+ future = Futures.submit(new WithTracker(transaction, cohort, commitStatsTracker), executor);
} catch (RejectedExecutionException e) {
LOG.error("The commit executor's queue is full - submit task was rejected. \n{}", executor, e);
return FluentFutures.immediateFailedFluentFuture(new TransactionCommitFailedException(
"Could not submit the commit task - the commit queue capacity has been exceeded.", e));
}
+
+ return FluentFuture.from(future);
}
}