Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Bug 2038: Ensure only one concurrent 3-phase commit in Shard
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
TransactionProxy.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
index 19d9a66a528eb417d5bff41948641b68e9c8e481..ec198510d3586f88dee40d04f9294a3fb370a9ae 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
@@
-8,7
+8,6
@@
package org.opendaylight.controller.cluster.datastore;
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
@@
-315,7
+314,7
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
}
LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
}
- List<Future<Actor
Path>> cohortPath
Futures = Lists.newArrayList();
+ List<Future<Actor
Selection>> cohort
Futures = Lists.newArrayList();
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
@@
-323,14
+322,14
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
}
LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
}
- cohort
Path
Futures.add(transactionContext.readyTransaction());
+ cohortFutures.add(transactionContext.readyTransaction());
}
if(transactionChainProxy != null){
}
if(transactionChainProxy != null){
- transactionChainProxy.onTransactionReady(cohort
Path
Futures);
+ transactionChainProxy.onTransactionReady(cohortFutures);
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohort
Path
Futures,
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
identifier.toString());
}
@@
-439,7
+438,7
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
void closeTransaction();
void closeTransaction();
- Future<Actor
Path
> readyTransaction();
+ Future<Actor
Selection
> readyTransaction();
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
@@
-499,10
+498,6
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
return actor;
}
return actor;
}
- private String getResolvedCohortPath(String cohortPath) {
- return actorContext.resolvePath(actorPath, cohortPath);
- }
-
@Override
public void closeTransaction() {
if(LOG.isDebugEnabled()) {
@Override
public void closeTransaction() {
if(LOG.isDebugEnabled()) {
@@
-512,7
+507,7
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
}
@Override
}
@Override
- public Future<Actor
Path
> readyTransaction() {
+ public Future<Actor
Selection
> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
@@
-538,9
+533,9
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
- return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, Actor
Path
>() {
+ return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, Actor
Selection
>() {
@Override
@Override
- public Actor
Path
apply(Iterable<Object> notUsed) {
+ public Actor
Selection
apply(Iterable<Object> notUsed) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
@@
-557,16
+552,9
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
if(serializedReadyReply.getClass().equals(
ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
if(serializedReadyReply.getClass().equals(
ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- actorContext.getActorSystem(), serializedReadyReply);
-
- String resolvedCohortPath = getResolvedCohortPath(
- reply.getCohortPath().toString());
+ serializedReadyReply);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
- identifier, resolvedCohortPath);
- }
- return actorContext.actorFor(resolvedCohortPath);
+ return actorContext.actorSelection(reply.getCohortPath());
} else {
// Throwing an exception here will fail the Future.
} else {
// Throwing an exception here will fail the Future.
@@
-805,7
+793,7
@@
public class TransactionProxy implements DOMStoreReadWriteTransaction {
}
@Override
}
@Override
- public Future<Actor
Path
> readyTransaction() {
+ public Future<Actor
Selection
> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called", identifier);
}
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called", identifier);
}