X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContext.java;h=ade9c375e5cfe93735c712fb8f1650125d9948d9;hp=b7c170568847cc701cce421b63a79633f4653226;hb=82be660900ab9bdd84941a0c3498c1ae36982aba;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index b7c1705688..ade9c375e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -8,10 +8,13 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; import java.util.SortedSet; @@ -20,9 +23,14 @@ import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -33,10 +41,10 @@ import scala.concurrent.Future; * * @author Thomas Pantelis */ -public class RemoteTransactionContext extends AbstractTransactionContext { +final class RemoteTransactionContext extends TransactionContext { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class); - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final ActorSelection actor; private final OperationLimiter limiter; @@ -52,32 +60,32 @@ public class RemoteTransactionContext extends AbstractTransactionContext { */ private volatile Throwable failedModification; - protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, - final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) { + RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, + final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) { super(identifier, remoteTransactionVersion); - this.limiter = Preconditions.checkNotNull(limiter); + this.limiter = requireNonNull(limiter); this.actor = actor; - this.actorContext = actorContext; + this.actorUtils = actorUtils; } private ActorSelection getActor() { return actor; } - protected ActorContext getActorContext() { - return actorContext; + protected ActorUtils getActorUtils() { + return actorUtils; } @Override - public void closeTransaction() { + void closeTransaction() { LOG.debug("Tx {} closeTransaction called", getIdentifier()); TransactionContextCleanup.untrack(this); - actorContext.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable()); + actorUtils.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable()); } @Override - public Future directCommit(final Boolean havePermit) { + Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. @@ -86,7 +94,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public Future readyTransaction(final Boolean havePermit, + Future readyTransaction(final Boolean havePermit, final Optional> participatingShardNames) { logModificationCount(); @@ -97,7 +105,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext { bumpPermits(havePermit); Future lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames); - return transformReadyReply(lastModificationsFuture); + // Transform the last reply Future into a Future that returns the cohort actor path from + // the last reply message. That's the end result of the ready operation. + return TransactionReadyReplyMapper.transform(lastModificationsFuture, actorUtils, getIdentifier()); } private void bumpPermits(final Boolean havePermit) { @@ -106,13 +116,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } } - protected Future transformReadyReply(final Future readyReplyFuture) { - // Transform the last reply Future into a Future that returns the cohort actor path from - // the last reply message. That's the end result of the ready operation. - - return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier()); - } - private BatchedModifications newBatchedModifications() { return new BatchedModifications(getIdentifier(), getTransactionVersion()); } @@ -130,16 +133,17 @@ public class RemoteTransactionContext extends AbstractTransactionContext { batchedModifications.addModification(modification); if (batchedModifications.getModifications().size() - >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) { + >= actorUtils.getDatastoreContext().getShardBatchedModificationCount()) { sendBatchedModifications(); } } - protected Future sendBatchedModifications() { + @VisibleForTesting + Future sendBatchedModifications() { return sendBatchedModifications(false, false, Optional.empty()); } - protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, + private Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, final Optional> participatingShardNames) { Future sent = null; if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { @@ -175,9 +179,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } } - sent = actorContext.executeOperationAsync(getActor(), toSend.toSerializable(), - actorContext.getTransactionCommitOperationTimeout()); - sent.onComplete(new OnComplete() { + sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(), + actorUtils.getTransactionCommitOperationTimeout()); + sent.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object success) { if (failure != null) { @@ -188,29 +192,43 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } limiter.release(permitsToRelease); } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } return sent; } @Override - public void executeModification(final AbstractModification modification, final Boolean havePermit) { - LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), - modification.getClass().getSimpleName(), modification.getPath()); + void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { + LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path); + executeModification(new DeleteModification(path), havePermit); + } + + @Override + void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path); + executeModification(new MergeModification(path, data), havePermit); + } + + @Override + void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path); + executeModification(new WriteModification(path, data), havePermit); + } + private void executeModification(final AbstractModification modification, final Boolean havePermit) { final boolean permitToRelease; if (havePermit == null) { permitToRelease = failedModification == null && acquireOperation(); } else { - permitToRelease = havePermit.booleanValue(); + permitToRelease = havePermit; } batchModification(modification, permitToRelease); } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, + void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, final Boolean havePermit) { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); @@ -228,10 +246,10 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue(); + final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit; sendBatchedModifications(); - OnComplete onComplete = new OnComplete() { + OnComplete onComplete = new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { // We have previously acquired an operation, now release it, no matter what happened @@ -252,9 +270,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } }; - final Future future = actorContext.executeOperationAsync(getActor(), - readCmd.asVersion(getTransactionVersion()).toSerializable(), actorContext.getOperationTimeout()); - future.onComplete(onComplete, actorContext.getClientDispatcher()); + final Future future = actorUtils.executeOperationAsync(getActor(), + readCmd.asVersion(getTransactionVersion()).toSerializable(), actorUtils.getOperationTimeout()); + future.onComplete(onComplete, actorUtils.getClientDispatcher()); } /** @@ -264,7 +282,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { * @return True if a permit was successfully acquired, false otherwise */ private boolean acquireOperation() { - Preconditions.checkState(isOperationHandOffComplete(), + checkState(isOperationHandOffComplete(), "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff", getIdentifier(), actor); @@ -277,7 +295,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public boolean usesOperationLimiting() { + boolean usesOperationLimiting() { return true; } }