package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-public class TransactionContextImpl extends AbstractTransactionContext {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+/**
+ * Redirects front-end transaction operations to a shard for processing. Instances of this class are used
+ * when the destination shard is remote to the caller.
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoteTransactionContext extends AbstractTransactionContext {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
private final ActorContext actorContext;
private final ActorSelection actor;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
- protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
@Override
public void closeTransaction() {
LOG.debug("Tx {} closeTransaction called", getIdentifier());
+ TransactionContextCleanup.untrack(this);
actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
}
// Transform the last reply Future into a Future that returns the cohort actor path from
// the last reply message. That's the end result of the ready operation.
- return readyReplyFuture.transform(new Mapper<Object, ActorSelection>() {
- @Override
- public ActorSelection checkedApply(Object serializedReadyReply) {
- LOG.debug("Tx {} readyTransaction", getIdentifier());
-
- // At this point the ready operation succeeded and we need to extract the cohort
- // actor path from the reply.
- if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
- ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
- }
-
- // Throwing an exception here will fail the Future.
- throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
- getIdentifier(), serializedReadyReply.getClass()));
- }
- }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
- }
-
- protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
- return readyTxReply.getCohortPath();
+ return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier());
}
private BatchedModifications newBatchedModifications() {