import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
- private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
- protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
+ protected RemoteTransactionContext(ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationCompleter operationCompleter) {
- super(identifier);
+ short remoteTransactionVersion, OperationLimiter limiter) {
+ super(limiter);
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
- this.operationCompleter = operationCompleter;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
+ operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
return operationFuture;
}
-
private ActorSelection getActor() {
return actor;
}
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new DeleteModification(path));
}
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new MergeModification(path, data));
}
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new WriteModification(path, data));
}
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
+ acquireOperation();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
+ acquireOperation();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {