import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return actorContext;
}
- protected Future<Object> executeOperationAsync(SerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
+ protected Future<Object> executeOperationAsync(SerializableMessage msg, Timeout timeout) {
+ return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout));
}
@Override
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(getIdentifier().toString(), getTransactionVersion(),
- getIdentifier().getChainId());
+ return new BatchedModifications(TransactionIdentifierUtils.actorNameFor(getIdentifier()),
+ getTransactionVersion(), RemoteTransactionContextSupport.compatTransactionChainId(getIdentifier()));
}
private void batchModification(Modification modification) {
batchedModifications.setReady(ready);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
- sent = executeOperationAsync(batchedModifications);
+ sent = executeOperationAsync(batchedModifications, actorContext.getTransactionCommitOperationTimeout());
if(ready) {
batchedModifications = null;
}
};
- Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()));
+ Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()),
+ actorContext.getOperationTimeout());
future.onComplete(onComplete, actorContext.getClientDispatcher());
}