import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
private final ActorContext actorContext;
private final ActorSelection actor;
- private final boolean isTxActorLocal;
- private final short remoteTransactionVersion;
private final OperationLimiter limiter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
- ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationLimiter limiter) {
- super(identifier);
+ ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
+ super(identifier, remoteTransactionVersion);
this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
this.actorContext = actorContext;
- this.isTxActorLocal = isTxActorLocal;
- this.remoteTransactionVersion = remoteTransactionVersion;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
return actorContext;
}
- protected short getRemoteTransactionVersion() {
- return remoteTransactionVersion;
- }
-
- protected Future<Object> executeOperationAsync(SerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+ protected Future<Object> executeOperationAsync(SerializableMessage msg, Timeout timeout) {
+ return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout));
}
@Override
LOG.debug("Tx {} closeTransaction called", getIdentifier());
TransactionContextCleanup.untrack(this);
- actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
- }
-
- @Override
- public boolean supportsDirectCommit() {
- return true;
+ actorContext.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable());
}
@Override
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId());
+ return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
private void batchModification(Modification modification) {
batchedModifications.setReady(ready);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
- sent = executeOperationAsync(batchedModifications);
+ sent = executeOperationAsync(batchedModifications, actorContext.getTransactionCommitOperationTimeout());
if(ready) {
batchedModifications = null;
}
};
- Future<Object> future = executeOperationAsync(readCmd);
+ Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()),
+ actorContext.getOperationTimeout());
future.onComplete(onComplete, actorContext.getClientDispatcher());
}