package org.opendaylight.controller.cluster.databroker.actors.dds;
import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
- ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} abort completed", this);
- parent.completeTransaction(this);
+ purge();
});
}
if (t instanceof TransactionCommitSuccess) {
ret.set(Boolean.TRUE);
} else if (t instanceof RequestFailure) {
- ret.setException(((RequestFailure<?, ?>) t).getCause());
+ ret.setException(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
}
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} directCommit completed", this);
- parent.completeTransaction(this);
+ purge();
});
return ret;
if (t instanceof TransactionCanCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
- ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
if (t instanceof TransactionPreCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
- ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
- recordSuccessfulRequest(req);
- LOG.debug("Transaction {} preCommit completed", this);
+ onPreCommitComplete(req);
});
}
+ private void onPreCommitComplete(final TransactionRequest<?> req) {
+ /*
+ * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed
+ * to storage after the timeout completes.
+ *
+ * All state has been replicated to the backend, hence we do not need to keep it around. Retain only
+ * the precommit request, so we know which request to use for resync.
+ */
+ LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this);
+ successfulRequests.clear();
+
+ // TODO: this works, but can contain some useless state (like batched operations). Create an empty
+ // equivalent of this request and store that.
+ recordSuccessfulRequest(req);
+ }
+
final void doCommit(final VotingFuture<?> ret) {
checkReadWrite();
checkSealed();
if (t instanceof TransactionCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
- ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
LOG.debug("Transaction {} doCommit completed", this);
+ purge();
+ });
+ }
+
+ void purge() {
+ successfulRequests.clear();
+
+ final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+ sendRequest(req, t -> {
+ LOG.debug("Transaction {} purge completed", this);
parent.completeTransaction(this);
});
}
for (Object obj : successfulRequests) {
if (obj instanceof TransactionRequest) {
LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
- successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+ successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, response -> { });
} else {
Verify.verify(obj instanceof IncrementSequence);
successor.incrementSequence(((IncrementSequence) obj).getDelta());
* @param request Request to be forwarded
* @param callback Original callback
*/
- final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
if (successor instanceof LocalProxyTransaction) {
abstract boolean isSnapshotOnly();
- abstract void doDelete(final YangInstanceIdentifier path);
+ abstract void doDelete(YangInstanceIdentifier path);
- abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+ abstract void doMerge(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
- abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+ abstract void doWrite(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
- abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
+ abstract CheckedFuture<Boolean, ReadFailedException> doExists(YangInstanceIdentifier path);
- abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
- final YangInstanceIdentifier path);
+ abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
abstract void doSeal();
*/
abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
Consumer<Response<?, ?>> callback);
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
+ }
}