X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContext.java;h=ade9c375e5cfe93735c712fb8f1650125d9948d9;hb=7ce039b3e55d153fc75bc88198c49536ab83befc;hp=6714815d7ec693a81097e089d1b1d2426dc1f551;hpb=f83b2d36fdd7e953ba72492ffb684cd112aa04a6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 6714815d7e..ade9c375e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -14,6 +14,7 @@ import static java.util.Objects.requireNonNull; import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; import java.util.SortedSet; @@ -22,9 +23,14 @@ import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -35,7 +41,7 @@ import scala.concurrent.Future; * * @author Thomas Pantelis */ -public class RemoteTransactionContext extends AbstractTransactionContext { +final class RemoteTransactionContext extends TransactionContext { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class); private final ActorUtils actorUtils; @@ -54,7 +60,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { */ private volatile Throwable failedModification; - protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, + RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) { super(identifier, remoteTransactionVersion); this.limiter = requireNonNull(limiter); @@ -71,7 +77,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void closeTransaction() { + void closeTransaction() { LOG.debug("Tx {} closeTransaction called", getIdentifier()); TransactionContextCleanup.untrack(this); @@ -79,7 +85,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public Future directCommit(final Boolean havePermit) { + Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. @@ -88,7 +94,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public Future readyTransaction(final Boolean havePermit, + Future readyTransaction(final Boolean havePermit, final Optional> participatingShardNames) { logModificationCount(); @@ -99,7 +105,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext { bumpPermits(havePermit); Future lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames); - return transformReadyReply(lastModificationsFuture); + // Transform the last reply Future into a Future that returns the cohort actor path from + // the last reply message. That's the end result of the ready operation. + return TransactionReadyReplyMapper.transform(lastModificationsFuture, actorUtils, getIdentifier()); } private void bumpPermits(final Boolean havePermit) { @@ -108,13 +116,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } } - protected Future transformReadyReply(final Future readyReplyFuture) { - // Transform the last reply Future into a Future that returns the cohort actor path from - // the last reply message. That's the end result of the ready operation. - - return TransactionReadyReplyMapper.transform(readyReplyFuture, actorUtils, getIdentifier()); - } - private BatchedModifications newBatchedModifications() { return new BatchedModifications(getIdentifier(), getTransactionVersion()); } @@ -137,11 +138,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } } - protected Future sendBatchedModifications() { + @VisibleForTesting + Future sendBatchedModifications() { return sendBatchedModifications(false, false, Optional.empty()); } - protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, + private Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, final Optional> participatingShardNames) { Future sent = null; if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { @@ -179,7 +181,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(), actorUtils.getTransactionCommitOperationTimeout()); - sent.onComplete(new OnComplete() { + sent.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object success) { if (failure != null) { @@ -197,22 +199,36 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void executeModification(final AbstractModification modification, final Boolean havePermit) { - LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), - modification.getClass().getSimpleName(), modification.getPath()); + void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { + LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path); + executeModification(new DeleteModification(path), havePermit); + } + + @Override + void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path); + executeModification(new MergeModification(path, data), havePermit); + } + + @Override + void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path); + executeModification(new WriteModification(path, data), havePermit); + } + private void executeModification(final AbstractModification modification, final Boolean havePermit) { final boolean permitToRelease; if (havePermit == null) { permitToRelease = failedModification == null && acquireOperation(); } else { - permitToRelease = havePermit.booleanValue(); + permitToRelease = havePermit; } batchModification(modification, permitToRelease); } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, + void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, final Boolean havePermit) { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); @@ -230,10 +246,10 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue(); + final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit; sendBatchedModifications(); - OnComplete onComplete = new OnComplete() { + OnComplete onComplete = new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { // We have previously acquired an operation, now release it, no matter what happened @@ -279,7 +295,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public boolean usesOperationLimiting() { + boolean usesOperationLimiting() { return true; } }