X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContext.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextImpl.java;h=a25ddc873327accdc31995b160409025b19ce337;hp=b6fe2c29bda9872245c90fa93f7aea7c4eeb6d5c;hb=2f7c93174d7834a4c4aedacc9b88aa53a5a0422c;hpb=864f3c5bb156a7f6f6e2d91a6c7d43e916909e84 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java similarity index 87% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index b6fe2c29bd..a25ddc8733 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; @@ -20,7 +19,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -34,8 +32,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -public class TransactionContextImpl extends AbstractTransactionContext { - private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); +/** + * Redirects front-end transaction operations to a shard for processing. Instances of this class are used + * when the destination shard is remote to the caller. + * + * @author Thomas Pantelis + */ +public class RemoteTransactionContext extends AbstractTransactionContext { + private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class); private final ActorContext actorContext; private final ActorSelection actor; @@ -46,7 +50,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { private BatchedModifications batchedModifications; private int totalBatchedModificationsSent; - protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); @@ -82,6 +86,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { @Override public void closeTransaction() { LOG.debug("Tx {} closeTransaction called", getIdentifier()); + TransactionContextCleanup.untrack(this); actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable()); } @@ -115,27 +120,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Transform the last reply Future into a Future that returns the cohort actor path from // the last reply message. That's the end result of the ready operation. - return readyReplyFuture.transform(new Mapper() { - @Override - public ActorSelection checkedApply(Object serializedReadyReply) { - LOG.debug("Tx {} readyTransaction", getIdentifier()); - - // At this point the ready operation succeeded and we need to extract the cohort - // actor path from the reply. - if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) { - ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - return actorContext.actorSelection(extractCohortPathFrom(readyTxReply)); - } - - // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", - getIdentifier(), serializedReadyReply.getClass())); - } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); - } - - protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { - return readyTxReply.getCohortPath(); + return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier()); } private BatchedModifications newBatchedModifications() {