Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "BUG-2801: Added filtering for get and getConfig in netconf mdsal northbound."
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
TransactionChainProxy.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
index 87959efe8ae2def5684e253f2e0840c7177db838..58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
@@
-104,11
+104,13
@@
public class TransactionChainProxy implements DOMStoreTransactionChain {
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
}
return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
}
@@
-173,45
+175,47
@@
public class TransactionChainProxy implements DOMStoreTransactionChain {
/**
* This method is overridden to ensure the previous Tx's ready operations complete
/**
* This method is overridden to ensure the previous Tx's ready operations complete
- * before we
create the next shard
Tx in the chain to avoid creation failures if the
+ * before we
initiate the next
Tx in the chain to avoid creation failures if the
* previous Tx's ready operations haven't completed yet.
*/
@Override
* previous Tx's ready operations haven't completed yet.
*/
@Override
- protected Future<Object> sendCreateTransaction(final ActorSelection shard,
- final Object serializedCreateMessage) {
-
+ protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
// Check if there are any previous ready Futures, otherwise let the super class handle it.
if(previousReadyFutures.isEmpty()) {
// Check if there are any previous ready Futures, otherwise let the super class handle it.
if(previousReadyFutures.isEmpty()) {
- return super.sendCreateTransaction(shard, serializedCreateMessage);
+ return super.sendFindPrimaryShardAsync(shardName);
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+ previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
}
// Combine the ready Futures into 1.
Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
}
// Combine the ready Futures into 1.
Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
- previousReadyFutures, getActorContext().get
ActorSystem().d
ispatcher());
+ previousReadyFutures, getActorContext().get
ClientD
ispatcher());
// Add a callback for completion of the combined Futures.
// Add a callback for completion of the combined Futures.
- final Promise<
Object> createTx
Promise = akka.dispatch.Futures.promise();
+ final Promise<
ActorSelection> return
Promise = akka.dispatch.Futures.promise();
OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
@Override
public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
if(failure != null) {
// A Ready Future failed so fail the returned Promise.
OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
@Override
public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
if(failure != null) {
// A Ready Future failed so fail the returned Promise.
-
createTx
Promise.failure(failure);
+
return
Promise.failure(failure);
} else {
} else {
- LOG.debug("Previous Tx readied - sending
CreateTransaction
for {} on chain {}",
+ LOG.debug("Previous Tx readied - sending
FindPrimaryShard
for {} on chain {}",
getIdentifier(), getTransactionChainId());
getIdentifier(), getTransactionChainId());
- // Send the
CreateTx
message and use the resulting Future to complete the
+ // Send the
FindPrimaryShard
message and use the resulting Future to complete the
// returned Promise.
// returned Promise.
- createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
- serializedCreateMessage));
+ returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
}
}
};
}
}
};
- combinedFutures.onComplete(onComplete, getActorContext().get
ActorSystem().d
ispatcher());
+ combinedFutures.onComplete(onComplete, getActorContext().get
ClientD
ispatcher());
- return
createTx
Promise.future();
+ return
return
Promise.future();
}
}
}
}
}
}