package org.opendaylight.controller.cluster.sharding;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
public ListenableFuture<Void> submit() {
LOG.debug("Submitting transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
final AsyncFunction<Void, Void> prepareFunction = input -> commit();
// transform validate into prepare
- final ListenableFuture<Void> prepareFuture = Futures.transform(validate(), validateFunction);
+ final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
+ MoreExecutors.directExecutor());
// transform prepare into commit and return as submit result
- return Futures.transform(prepareFuture, prepareFunction);
+ return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
+ }
+
+ private void checkTransactionReadied() {
+ Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
}
@Override
public ListenableFuture<Boolean> validate() {
LOG.debug("Validating transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final List<ListenableFuture<Boolean>> futures =
cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
final SettableFuture<Boolean> ret = SettableFuture.create();
public void onFailure(final Throwable throwable) {
ret.setException(throwable);
}
- });
+ }, MoreExecutors.directExecutor());
return ret;
}
public ListenableFuture<Void> prepare() {
LOG.debug("Preparing transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final List<ListenableFuture<Void>> futures =
cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
final SettableFuture<Void> ret = SettableFuture.create();
public void onFailure(final Throwable throwable) {
ret.setException(throwable);
}
- });
+ }, MoreExecutors.directExecutor());
return ret;
}
public ListenableFuture<Void> commit() {
LOG.debug("Committing transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final List<ListenableFuture<Void>> futures =
cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
final SettableFuture<Void> ret = SettableFuture.create();
public void onFailure(final Throwable throwable) {
ret.setException(throwable);
}
- });
+ }, MoreExecutors.directExecutor());
return ret;
}