X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContext.java;h=ba1d05068b0ffc23b68097e8e4109baca1fbd9d6;hb=729e3f9606dae61f98bd0bca0cfb082c22e5b8d8;hp=27969b3e8ef405331c1e69fb1b9f912612d22538;hpb=123afd8b015173c459f4937c84eb2e91286f65a8;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 27969b3e8e..ba1d05068b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -8,19 +8,28 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -34,7 +43,7 @@ import scala.concurrent.Future; public class RemoteTransactionContext extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class); - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final ActorSelection actor; private final OperationLimiter limiter; @@ -51,19 +60,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext { private volatile Throwable failedModification; protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, - final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) { + final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) { super(identifier, remoteTransactionVersion); - this.limiter = Preconditions.checkNotNull(limiter); + this.limiter = requireNonNull(limiter); this.actor = actor; - this.actorContext = actorContext; + this.actorUtils = actorUtils; } private ActorSelection getActor() { return actor; } - protected ActorContext getActorContext() { - return actorContext; + protected ActorUtils getActorUtils() { + return actorUtils; } @Override @@ -71,7 +80,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { LOG.debug("Tx {} closeTransaction called", getIdentifier()); TransactionContextCleanup.untrack(this); - actorContext.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable()); + actorUtils.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable()); } @Override @@ -80,11 +89,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send the remaining batched modifications, if any, with the ready flag set. bumpPermits(havePermit); - return sendBatchedModifications(true, true); + return sendBatchedModifications(true, true, Optional.empty()); } @Override - public Future readyTransaction(final Boolean havePermit) { + public Future readyTransaction(final Boolean havePermit, + final Optional> participatingShardNames) { logModificationCount(); LOG.debug("Tx {} readyTransaction called", getIdentifier()); @@ -92,7 +102,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send the remaining batched modifications, if any, with the ready flag set. bumpPermits(havePermit); - Future lastModificationsFuture = sendBatchedModifications(true, false); + Future lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames); return transformReadyReply(lastModificationsFuture); } @@ -107,7 +117,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Transform the last reply Future into a Future that returns the cohort actor path from // the last reply message. That's the end result of the ready operation. - return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier()); + return TransactionReadyReplyMapper.transform(readyReplyFuture, actorUtils, getIdentifier()); } private BatchedModifications newBatchedModifications() { @@ -127,16 +137,17 @@ public class RemoteTransactionContext extends AbstractTransactionContext { batchedModifications.addModification(modification); if (batchedModifications.getModifications().size() - >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) { + >= actorUtils.getDatastoreContext().getShardBatchedModificationCount()) { sendBatchedModifications(); } } protected Future sendBatchedModifications() { - return sendBatchedModifications(false, false); + return sendBatchedModifications(false, false, Optional.empty()); } - protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) { + protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, + final Optional> participatingShardNames) { Future sent = null; if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { if (batchedModifications == null) { @@ -146,7 +157,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), batchedModifications.getModifications().size(), ready); - batchedModifications.setReady(ready); batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); @@ -155,6 +165,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { batchPermits = 0; if (ready) { + batchedModifications.setReady(participatingShardNames); + batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications = null; } else { batchedModifications = newBatchedModifications(); @@ -170,9 +182,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } } - sent = actorContext.executeOperationAsync(getActor(), toSend.toSerializable(), - actorContext.getTransactionCommitOperationTimeout()); - sent.onComplete(new OnComplete() { + sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(), + actorUtils.getTransactionCommitOperationTimeout()); + sent.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object success) { if (failure != null) { @@ -183,17 +195,33 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } limiter.release(permitsToRelease); } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); } return sent; } @Override - public void executeModification(final AbstractModification modification, final Boolean havePermit) { - LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), - modification.getClass().getSimpleName(), modification.getPath()); + public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { + LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path); + executeModification(new DeleteModification(path), havePermit); + } + + @Override + public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, + final Boolean havePermit) { + LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path); + executeModification(new MergeModification(path, data), havePermit); + } + + @Override + public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, + final Boolean havePermit) { + LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path); + executeModification(new WriteModification(path, data), havePermit); + } + private void executeModification(final AbstractModification modification, final Boolean havePermit) { final boolean permitToRelease; if (havePermit == null) { permitToRelease = failedModification == null && acquireOperation(); @@ -226,7 +254,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue(); sendBatchedModifications(); - OnComplete onComplete = new OnComplete() { + OnComplete onComplete = new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { // We have previously acquired an operation, now release it, no matter what happened @@ -235,8 +263,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } if (failure != null) { - LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(), - failure); + LOG.debug("Tx {} {} operation failed", getIdentifier(), readCmd.getClass().getSimpleName(), + failure); returnFuture.setException(new ReadFailedException("Error checking " + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure)); @@ -247,9 +275,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } }; - final Future future = actorContext.executeOperationAsync(getActor(), - readCmd.asVersion(getTransactionVersion()).toSerializable(), actorContext.getOperationTimeout()); - future.onComplete(onComplete, actorContext.getClientDispatcher()); + final Future future = actorUtils.executeOperationAsync(getActor(), + readCmd.asVersion(getTransactionVersion()).toSerializable(), actorUtils.getOperationTimeout()); + future.onComplete(onComplete, actorUtils.getClientDispatcher()); } /** @@ -259,7 +287,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { * @return True if a permit was successfully acquired, false otherwise */ private boolean acquireOperation() { - Preconditions.checkState(isOperationHandOffComplete(), + checkState(isOperationHandOffComplete(), "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff", getIdentifier(), actor);