X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=2fa98981be5bd3b2b58a324258a4eb741d0e5661;hp=0ba660234a15cee42c84f1c00b61d2ad2ef244e6;hb=047566574ea74d1dfe24fa8075f8ba137faa698c;hpb=d6ed0a044d591d65847714451d97d80345154089 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 0ba660234a..2fa98981be 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; +import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -34,6 +35,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionCommitSucc import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; @@ -314,14 +316,14 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } // This is a terminal request, hence we do not need to record it LOG.debug("Transaction {} abort completed", this); - parent.completeTransaction(this); + purge(); }); } @@ -347,14 +349,14 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.setException(((RequestFailure) t).getCause().unwrap()); } else { ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); } // This is a terminal request, hence we do not need to record it LOG.debug("Transaction {} directCommit completed", this); - parent.completeTransaction(this); + purge(); }); return ret; @@ -378,7 +380,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } @@ -409,16 +411,31 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } - recordSuccessfulRequest(req); - LOG.debug("Transaction {} preCommit completed", this); + onPreCommitComplete(req); }); } + private void onPreCommitComplete(final TransactionRequest req) { + /* + * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed + * to storage after the timeout completes. + * + * All state has been replicated to the backend, hence we do not need to keep it around. Retain only + * the precommit request, so we know which request to use for resync. + */ + LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this); + successfulRequests.clear(); + + // TODO: this works, but can contain some useless state (like batched operations). Create an empty + // equivalent of this request and store that. + recordSuccessfulRequest(req); + } + final void doCommit(final VotingFuture ret) { checkReadWrite(); checkSealed(); @@ -427,12 +444,22 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } LOG.debug("Transaction {} doCommit completed", this); + purge(); + }); + } + + void purge() { + successfulRequests.clear(); + + final TransactionRequest req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); + sendRequest(req, t -> { + LOG.debug("Transaction {} purge completed", this); parent.completeTransaction(this); }); } @@ -464,7 +491,7 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, null); + successor.handleForwardedRemoteRequest((TransactionRequest) obj, response -> { }); } else { Verify.verify(obj instanceof IncrementSequence); successor.incrementSequence(((IncrementSequence) obj).getDelta()); @@ -516,7 +543,7 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback) { + final void forwardRequest(final TransactionRequest request, final Consumer> callback) { final AbstractProxyTransaction successor = getSuccessorState().getSuccessor(); if (successor instanceof LocalProxyTransaction) { @@ -530,16 +557,15 @@ abstract class AbstractProxyTransaction implements Identifiable data); + abstract void doMerge(YangInstanceIdentifier path, NormalizedNode data); - abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode data); + abstract void doWrite(YangInstanceIdentifier path, NormalizedNode data); - abstract CheckedFuture doExists(final YangInstanceIdentifier path); + abstract CheckedFuture doExists(YangInstanceIdentifier path); - abstract CheckedFuture>, ReadFailedException> doRead( - final YangInstanceIdentifier path); + abstract CheckedFuture>, ReadFailedException> doRead(YangInstanceIdentifier path); abstract void doSeal(); @@ -575,4 +601,9 @@ abstract class AbstractProxyTransaction implements Identifiable request, Consumer> callback); + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString(); + } }