import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
processNextPendingTransaction();
}
- private void failPreCommit(final Exception cause) {
+ private void failPreCommit(final Throwable cause) {
shard.getShardMBean().incrementFailedTransactionsCount();
pendingTransactions.poll().cohort.failedPreCommit(cause);
processNextPendingTransaction();
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
- } catch (ExecutionException | TimeoutException | RuntimeException e) {
+ } catch (RuntimeException e) {
failPreCommit(e);
return;
}
- // Set the tip of the data tree.
- tip = Verify.verifyNotNull(candidate);
+ cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void noop) {
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
- entry.lastAccess = readTime();
+ entry.lastAccess = readTime();
- pendingTransactions.remove();
- pendingCommits.add(entry);
+ pendingTransactions.remove();
+ pendingCommits.add(entry);
- LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+ LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
- cohort.successfulPreCommit(candidate);
+ cohort.successfulPreCommit(candidate);
- processNextPendingTransaction();
+ processNextPendingTransaction();
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failPreCommit(failure);
+ }
+ });
}
private void failCommit(final Exception cause) {
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
// FIXME: propagate journal index
- pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+ pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> {
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
- LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
- notifyListeners(candidate);
-
- processNextPending();
+ processNextPending();
+ });
}
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
- cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+ COMMIT_STEP_TIMEOUT));
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;
}
try {
tip.validate(cohort.getDataTreeModification());
DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
cohort.setNewCandidate(candidate);
tip = candidate;
- } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+ } catch (RuntimeException | DataValidationFailedException e) {
LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
cohort.reportFailure(e);
}