package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
+ private int batchPermits;
+
+ /**
+ * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
+ * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
+ * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
+ * message to resynchronize with the backend, sharing a 'lost message' failure path.
+ */
+ private volatile Throwable failedModification;
protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
this.actorContext = actorContext;
}
- private Future<Object> completeOperation(Future<Object> operationFuture) {
- operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
- return operationFuture;
- }
-
private ActorSelection getActor() {
return actor;
}
return actorContext;
}
- protected Future<Object> executeOperationAsync(SerializableMessage msg, Timeout timeout) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout));
- }
-
@Override
public void closeTransaction() {
LOG.debug("Tx {} closeTransaction called", getIdentifier());
return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
- private void batchModification(Modification modification) {
+ private void batchModification(Modification modification, boolean havePermit) {
incrementModificationCount();
+ if (havePermit) {
+ ++batchPermits;
+ }
+
if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
final BatchedModifications toSend = batchedModifications;
+ final int permitsToRelease = batchPermits;
+ batchPermits = 0;
+
if (ready) {
batchedModifications = null;
} else {
batchedModifications = newBatchedModifications();
+
+ final Throwable failure = failedModification;
+ if (failure != null) {
+ // We have observed a modification failure, it does not make sense to send this batch. This speeds
+ // up the time when the application could be blocked due to messages timing out and operation
+ // limiter kicking in.
+ LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
+ limiter.release(permitsToRelease);
+ return Futures.failed(failure);
+ }
}
- sent = executeOperationAsync(toSend, actorContext.getTransactionCommitOperationTimeout());
+ sent = actorContext.executeOperationAsync(getActor(), toSend.toSerializable(),
+ actorContext.getTransactionCommitOperationTimeout());
+ sent.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object success) {
+ if (failure != null) {
+ LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
+ failedModification = failure;
+ } else {
+ LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
+ }
+ limiter.release(permitsToRelease);
+ }
+ }, actorContext.getClientDispatcher());
}
return sent;
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
- acquireOperation();
- batchModification(modification);
+ final boolean havePermit = failedModification == null && acquireOperation();
+ batchModification(modification, havePermit);
}
@Override
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
readCmd.getPath());
+ final Throwable failure = failedModification;
+ if (failure != null) {
+ // If we know there was a previous modification failure, we must not send a read request, as it risks
+ // returning incorrect data. We check this before acquiring an operation simply because we want the app
+ // to complete this transaction as soon as possible.
+ returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
+ + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
+ return;
+ }
+
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
- acquireOperation();
+ final boolean havePermit = acquireOperation();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) throws Throwable {
+ public void onComplete(Throwable failure, Object response) {
+ // We have previously acquired an operation, now release it, no matter what happened
+ if (havePermit) {
+ limiter.release();
+ }
+
if (failure != null) {
LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
failure);
}
};
- Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()),
- actorContext.getOperationTimeout());
-
+ final Future<Object> future = actorContext.executeOperationAsync(getActor(),
+ readCmd.asVersion(getTransactionVersion()).toSerializable(), actorContext.getOperationTimeout());
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
/**
- * Acquire operation from the limiter if the hand-off has completed. If
- * the hand-off is still ongoing, this method does nothing.
+ * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
+ * does nothing.
+ *
+ * @return True if a permit was successfully acquired, false otherwise
*/
- private void acquireOperation() {
- if (isOperationHandOffComplete()) {
- limiter.acquire();
- }
+ private boolean acquireOperation() {
+ return isOperationHandOffComplete() && limiter.acquire();
}
@Override