import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
this.actorContext = actorContext;
}
- private Future<Object> completeOperation(Future<Object> operationFuture){
+ private Future<Object> completeOperation(Future<Object> operationFuture) {
operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
return operationFuture;
}
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(TransactionIdentifierUtils.actorNameFor(getIdentifier()),
- getTransactionVersion(), RemoteTransactionContextSupport.compatTransactionChainId(getIdentifier()));
+ return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
private void batchModification(Modification modification) {
incrementModificationCount();
- if(batchedModifications == null) {
+ if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
batchedModifications.addModification(modification);
- if(batchedModifications.getModifications().size() >=
- actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+ if (batchedModifications.getModifications().size()
+ >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
sendBatchedModifications();
}
}
protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
Future<Object> sent = null;
- if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
- if(batchedModifications == null) {
+ if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
+ if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
- batchedModifications.getModifications().size(), ready);
- }
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
+ batchedModifications.getModifications().size(), ready);
batchedModifications.setReady(ready);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
- sent = executeOperationAsync(batchedModifications, actorContext.getTransactionCommitOperationTimeout());
- if(ready) {
+ final BatchedModifications toSend = batchedModifications;
+ if (ready) {
batchedModifications = null;
} else {
batchedModifications = newBatchedModifications();
}
+
+ sent = executeOperationAsync(toSend, actorContext.getTransactionCommitOperationTimeout());
}
return sent;
@Override
public void executeModification(AbstractModification modification) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass()
- .getSimpleName(), modification.getPath());
- }
+ LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
+ modification.getClass().getSimpleName(), modification.getPath());
acquireOperation();
batchModification(modification);
@Override
public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- readCmd.getPath());
- }
+ LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ readCmd.getPath());
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- failure);
- }
- returnFuture.setException(new ReadFailedException("Error checking " + readCmd.getClass().getSimpleName()
- + " for path " + readCmd.getPath(), failure));
+ if (failure != null) {
+ LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ failure);
+
+ returnFuture.setException(new ReadFailedException("Error checking "
+ + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
} else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
- }
+ LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
readCmd.processResponse(response, returnFuture);
}
}
* Acquire operation from the limiter if the hand-off has completed. If
* the hand-off is still ongoing, this method does nothing.
*/
- private final void acquireOperation() {
+ private void acquireOperation() {
if (isOperationHandOffComplete()) {
limiter.acquire();
}