*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import java.util.Optional;
+import java.util.SortedSet;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
*
* @author Thomas Pantelis
*/
-public class RemoteTransactionContext extends AbstractTransactionContext {
+final class RemoteTransactionContext extends TransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
- private final ActorContext actorContext;
+ private final ActorUtils actorUtils;
private final ActorSelection actor;
- private final boolean isTxActorLocal;
- private final short remoteTransactionVersion;
private final OperationLimiter limiter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
+ private int batchPermits;
- protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
- ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationLimiter limiter) {
- super(identifier);
- this.limiter = Preconditions.checkNotNull(limiter);
- this.actor = actor;
- this.actorContext = actorContext;
- this.isTxActorLocal = isTxActorLocal;
- this.remoteTransactionVersion = remoteTransactionVersion;
- }
+ /**
+ * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
+ * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
+ * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
+ * message to resynchronize with the backend, sharing a 'lost message' failure path.
+ */
+ private volatile Throwable failedModification;
- private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
- return operationFuture;
+ RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
+ final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) {
+ super(identifier, remoteTransactionVersion);
+ this.limiter = requireNonNull(limiter);
+ this.actor = actor;
+ this.actorUtils = actorUtils;
}
private ActorSelection getActor() {
return actor;
}
- protected ActorContext getActorContext() {
- return actorContext;
- }
-
- protected short getRemoteTransactionVersion() {
- return remoteTransactionVersion;
- }
-
- protected Future<Object> executeOperationAsync(SerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+ protected ActorUtils getActorUtils() {
+ return actorUtils;
}
@Override
- public void closeTransaction() {
+ void closeTransaction() {
LOG.debug("Tx {} closeTransaction called", getIdentifier());
TransactionContextCleanup.untrack(this);
- actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
- }
-
- @Override
- public boolean supportsDirectCommit() {
- return true;
+ actorUtils.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable());
}
@Override
- public Future<Object> directCommit() {
+ Future<Object> directCommit(final Boolean havePermit) {
LOG.debug("Tx {} directCommit called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
-
- return sendBatchedModifications(true, true);
+ bumpPermits(havePermit);
+ return sendBatchedModifications(true, true, Optional.empty());
}
@Override
- public Future<ActorSelection> readyTransaction() {
+ Future<ActorSelection> readyTransaction(final Boolean havePermit,
+ final Optional<SortedSet<String>> participatingShardNames) {
logModificationCount();
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
- Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
-
- return transformReadyReply(lastModificationsFuture);
- }
+ bumpPermits(havePermit);
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
- protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
// Transform the last reply Future into a Future that returns the cohort actor path from
// the last reply message. That's the end result of the ready operation.
+ return TransactionReadyReplyMapper.transform(lastModificationsFuture, actorUtils, getIdentifier());
+ }
- return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier());
+ private void bumpPermits(final Boolean havePermit) {
+ if (Boolean.TRUE.equals(havePermit)) {
+ ++batchPermits;
+ }
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId());
+ return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
- private void batchModification(Modification modification) {
+ private void batchModification(final Modification modification, final boolean havePermit) {
incrementModificationCount();
- if(batchedModifications == null) {
+ if (havePermit) {
+ ++batchPermits;
+ }
+
+ if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
batchedModifications.addModification(modification);
- if(batchedModifications.getModifications().size() >=
- actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+ if (batchedModifications.getModifications().size()
+ >= actorUtils.getDatastoreContext().getShardBatchedModificationCount()) {
sendBatchedModifications();
}
}
- protected Future<Object> sendBatchedModifications() {
- return sendBatchedModifications(false, false);
+ @VisibleForTesting
+ Future<Object> sendBatchedModifications() {
+ return sendBatchedModifications(false, false, Optional.empty());
}
- protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
+ private Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
+ final Optional<SortedSet<String>> participatingShardNames) {
Future<Object> sent = null;
- if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
- if(batchedModifications == null) {
+ if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
+ if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
- batchedModifications.getModifications().size(), ready);
- }
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
+ batchedModifications.getModifications().size(), ready);
- batchedModifications.setReady(ready);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
- sent = executeOperationAsync(batchedModifications);
- if(ready) {
+ final BatchedModifications toSend = batchedModifications;
+ final int permitsToRelease = batchPermits;
+ batchPermits = 0;
+
+ if (ready) {
+ batchedModifications.setReady(participatingShardNames);
+ batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications = null;
} else {
batchedModifications = newBatchedModifications();
+
+ final Throwable failure = failedModification;
+ if (failure != null) {
+ // We have observed a modification failure, it does not make sense to send this batch. This speeds
+ // up the time when the application could be blocked due to messages timing out and operation
+ // limiter kicking in.
+ LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
+ limiter.release(permitsToRelease);
+ return Futures.failed(failure);
+ }
}
+
+ sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(),
+ actorUtils.getTransactionCommitOperationTimeout());
+ sent.onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ if (failure != null) {
+ LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
+ failedModification = failure;
+ } else {
+ LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
+ }
+ limiter.release(permitsToRelease);
+ }
+ }, actorUtils.getClientDispatcher());
}
return sent;
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
-
- acquireOperation();
- batchModification(new DeleteModification(path));
+ void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
+ LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
+ executeModification(new DeleteModification(path), havePermit);
}
@Override
- public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-
- acquireOperation();
- batchModification(new MergeModification(path, data));
+ void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+ LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
+ executeModification(new MergeModification(path, data), havePermit);
}
@Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-
- acquireOperation();
- batchModification(new WriteModification(path, data));
+ void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+ LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
+ executeModification(new WriteModification(path, data), havePermit);
}
- @Override
- public void readData(final YangInstanceIdentifier path,
- final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
-
- LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
-
- // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
- // public API contract.
-
- acquireOperation();
- sendBatchedModifications();
-
- OnComplete<Object> onComplete = new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object readResponse) throws Throwable {
- if(failure != null) {
- LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "Error reading data for path " + path, failure));
-
- } else {
- LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure);
-
- if (readResponse instanceof ReadDataReply) {
- ReadDataReply reply = (ReadDataReply) readResponse;
- returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
-
- } else if (ReadDataReply.isSerializedType(readResponse)) {
- ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
- returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
-
- } else {
- returnFuture.setException(new ReadFailedException(
- "Invalid response reading data for path " + path));
- }
- }
- }
- };
-
- Future<Object> readFuture = executeOperationAsync(new ReadData(path));
+ private void executeModification(final AbstractModification modification, final Boolean havePermit) {
+ final boolean permitToRelease;
+ if (havePermit == null) {
+ permitToRelease = failedModification == null && acquireOperation();
+ } else {
+ permitToRelease = havePermit;
+ }
- readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
+ batchModification(modification, permitToRelease);
}
@Override
- public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
-
- LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
+ <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
+ final Boolean havePermit) {
+ LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ readCmd.getPath());
+
+ final Throwable failure = failedModification;
+ if (failure != null) {
+ // If we know there was a previous modification failure, we must not send a read request, as it risks
+ // returning incorrect data. We check this before acquiring an operation simply because we want the app
+ // to complete this transaction as soon as possible.
+ returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
+ + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
+ return;
+ }
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
- acquireOperation();
+ final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit;
sendBatchedModifications();
- OnComplete<Object> onComplete = new OnComplete<Object>() {
+ OnComplete<Object> onComplete = new OnComplete<>() {
@Override
- public void onComplete(Throwable failure, Object response) throws Throwable {
- if(failure != null) {
- LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "Error checking data exists for path " + path, failure));
- } else {
- LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure);
-
- if (response instanceof DataExistsReply) {
- returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
+ public void onComplete(final Throwable failure, final Object response) {
+ // We have previously acquired an operation, now release it, no matter what happened
+ if (permitToRelease) {
+ limiter.release();
+ }
- } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
- returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
+ if (failure != null) {
+ LOG.debug("Tx {} {} operation failed", getIdentifier(), readCmd.getClass().getSimpleName(),
+ failure);
- } else {
- returnFuture.setException(new ReadFailedException(
- "Invalid response checking exists for path " + path));
- }
+ returnFuture.setException(new ReadFailedException("Error checking "
+ + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
+ } else {
+ LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
+ readCmd.processResponse(response, returnFuture);
}
}
};
- Future<Object> future = executeOperationAsync(new DataExists(path));
-
- future.onComplete(onComplete, actorContext.getClientDispatcher());
+ final Future<Object> future = actorUtils.executeOperationAsync(getActor(),
+ readCmd.asVersion(getTransactionVersion()).toSerializable(), actorUtils.getOperationTimeout());
+ future.onComplete(onComplete, actorUtils.getClientDispatcher());
}
/**
- * Acquire operation from the limiter if the hand-off has completed. If
- * the hand-off is still ongoing, this method does nothing.
+ * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
+ * does nothing.
+ *
+ * @return True if a permit was successfully acquired, false otherwise
*/
- private final void acquireOperation() {
- if (isOperationHandOffComplete()) {
- limiter.acquire();
+ private boolean acquireOperation() {
+ checkState(isOperationHandOffComplete(),
+ "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
+ getIdentifier(), actor);
+
+ if (limiter.acquire()) {
+ return true;
}
+
+ LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
+ return false;
}
@Override
- public boolean usesOperationLimiting() {
+ boolean usesOperationLimiting() {
return true;
}
}