import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
+ private final OperationLimiter limiter;
- private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
- protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
+ protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ short remoteTransactionVersion, OperationLimiter limiter) {
super(identifier);
+ this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
- this.operationCompleter = operationCompleter;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
+ operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
return operationFuture;
}
-
private ActorSelection getActor() {
return actor;
}
@Override
public Future<ActorSelection> readyTransaction() {
+ logModificationCount();
+
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
}
private void batchModification(Modification modification) {
+ incrementModificationCount();
if(batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new DeleteModification(path));
}
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new MergeModification(path, data));
}
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new WriteModification(path, data));
}
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
+ acquireOperation();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
+ acquireOperation();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
+
+ /**
+ * Acquire operation from the limiter if the hand-off has completed. If
+ * the hand-off is still ongoing, this method does nothing.
+ */
+ private final void acquireOperation() {
+ if (isOperationHandOffComplete()) {
+ limiter.acquire();
+ }
+ }
+
+ @Override
+ public boolean usesOperationLimiting() {
+ return true;
+ }
}