Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
CONTROLLER-1641: Handle commit cohorts async
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
ShardDataTree.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
index 9aca517bb110d2f28381167962f2ad8f3de288af..a155e5e0e049ab5afc19e2dc969bbcd775c0a494 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
@@
-20,6
+20,7
@@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
@@
-31,7
+32,6
@@
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@@
-808,7
+808,7
@@
public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextPendingTransaction();
}
processNextPendingTransaction();
}
- private void failPreCommit(final
Exception
cause) {
+ private void failPreCommit(final
Throwable
cause) {
shard.getShardMBean().incrementFailedTransactionsCount();
pendingTransactions.poll().cohort.failedPreCommit(cause);
processNextPendingTransaction();
shard.getShardMBean().incrementFailedTransactionsCount();
pendingTransactions.poll().cohort.failedPreCommit(cause);
processNextPendingTransaction();
@@
-827,25
+827,34
@@
public class ShardDataTree extends ShardDataTreeTransactionParent {
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
- } catch (ExecutionException | TimeoutException | RuntimeException e) {
+ } catch (RuntimeException e) {
failPreCommit(e);
return;
}
failPreCommit(e);
return;
}
- // Set the tip of the data tree.
- tip = Verify.verifyNotNull(candidate);
+ cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void noop) {
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
- entry.lastAccess = readTime();
+
entry.lastAccess = readTime();
- pendingTransactions.remove();
- pendingCommits.add(entry);
+
pendingTransactions.remove();
+
pendingCommits.add(entry);
- LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+
LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
- cohort.successfulPreCommit(candidate);
+
cohort.successfulPreCommit(candidate);
- processNextPendingTransaction();
+ processNextPendingTransaction();
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failPreCommit(failure);
+ }
+ });
}
private void failCommit(final Exception cause) {
}
private void failCommit(final Exception cause) {
@@
-878,12
+887,12
@@
public class ShardDataTree extends ShardDataTreeTransactionParent {
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
// FIXME: propagate journal index
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
// FIXME: propagate journal index
- pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+ pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> {
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
- LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
- notifyListeners(candidate);
-
- processNextPending();
+ processNextPending();
+ });
}
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
}
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
@@
-958,7
+967,8
@@
public class ShardDataTree extends ShardDataTreeTransactionParent {
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
- cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+ COMMIT_STEP_TIMEOUT));
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;
}
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;
}
@@
-1142,11
+1152,10
@@
public class ShardDataTree extends ShardDataTreeTransactionParent {
try {
tip.validate(cohort.getDataTreeModification());
DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
try {
tip.validate(cohort.getDataTreeModification());
DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
cohort.setNewCandidate(candidate);
tip = candidate;
cohort.setNewCandidate(candidate);
tip = candidate;
- } catch (
ExecutionException | TimeoutException |
RuntimeException | DataValidationFailedException e) {
+ } catch (RuntimeException | DataValidationFailedException e) {
LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
cohort.reportFailure(e);
}
LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
cohort.reportFailure(e);
}