X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=83ba07b69ac8f6e3ed253d18e420315065069e4d;hb=cd801d3b254bf709903b1fd31379967ab8ac1f36;hp=e6313d3cfd68f052b9e51582ac6e280983b92b99;hpb=b66d5a3c59525a1c7885c3d653d9657a99f4103d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index e6313d3cfd..83ba07b69a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; +import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -27,6 +28,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -34,6 +36,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionCommitSucc import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; @@ -314,14 +317,14 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } // This is a terminal request, hence we do not need to record it LOG.debug("Transaction {} abort completed", this); - parent.completeTransaction(this); + purge(); }); } @@ -347,14 +350,14 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.setException(((RequestFailure) t).getCause().unwrap()); } else { ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); } // This is a terminal request, hence we do not need to record it LOG.debug("Transaction {} directCommit completed", this); - parent.completeTransaction(this); + purge(); }); return ret; @@ -378,7 +381,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } @@ -409,16 +412,31 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } - recordSuccessfulRequest(req); - LOG.debug("Transaction {} preCommit completed", this); + onPreCommitComplete(req); }); } + private void onPreCommitComplete(final TransactionRequest req) { + /* + * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed + * to storage after the timeout completes. + * + * All state has been replicated to the backend, hence we do not need to keep it around. Retain only + * the precommit request, so we know which request to use for resync. + */ + LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this); + successfulRequests.clear(); + + // TODO: this works, but can contain some useless state (like batched operations). Create an empty + // equivalent of this request and store that. + recordSuccessfulRequest(req); + } + final void doCommit(final VotingFuture ret) { checkReadWrite(); checkSealed(); @@ -427,12 +445,22 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause()); + ret.voteNo(((RequestFailure) t).getCause().unwrap()); } else { ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); } LOG.debug("Transaction {} doCommit completed", this); + purge(); + }); + } + + void purge() { + successfulRequests.clear(); + + final TransactionRequest req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); + sendRequest(req, t -> { + LOG.debug("Transaction {} purge completed", this); parent.completeTransaction(this); }); } @@ -464,7 +492,7 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, null); + successor.replay((TransactionRequest) obj, response -> { }); } else { Verify.verify(obj instanceof IncrementSequence); successor.incrementSequence(((IncrementSequence) obj).getDelta()); @@ -481,8 +509,8 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback()); + LOG.debug("Forwarding queued request {} to successor {}", req, successor); + successor.replay((TransactionRequest) req, e.getCallback()); it.remove(); } } @@ -500,6 +528,24 @@ abstract class AbstractProxyTransaction implements Identifiable + * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes + */ + private void replay(TransactionRequest request, Consumer> callback) { + if (request instanceof AbstractLocalTransactionRequest) { + handleForwardedLocalRequest((AbstractLocalTransactionRequest) request, callback); + } else { + handleForwardedRemoteRequest(request, callback); + } + } + // Called with the connection locked final void finishReconnect() { final SuccessorState local = getSuccessorState(); @@ -516,7 +562,7 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback) { + final void forwardRequest(final TransactionRequest request, final Consumer> callback) { final AbstractProxyTransaction successor = getSuccessorState().getSuccessor(); if (successor instanceof LocalProxyTransaction) { @@ -550,9 +596,19 @@ abstract class AbstractProxyTransaction implements Identifiable commitRequest(boolean coordinated); /** - * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is - * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all - * operations are packaged in the message. + * Replay a request originating in this proxy to a successor remote proxy. + */ + abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest request, + Consumer> callback); + + /** + * Replay a request originating in this proxy to a successor local proxy. + */ + abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest request, + Consumer> callback); + + /** + * Invoked from {@link LocalProxyTransaction} when it replays its successful requests to its successor. * *

* Note: this method is invoked by the predecessor on the successor. @@ -560,18 +616,23 @@ abstract class AbstractProxyTransaction implements Identifiable request, + abstract void handleForwardedLocalRequest(AbstractLocalTransactionRequest request, @Nullable Consumer> callback); /** - * Replay a request originating in this proxy to a successor remote proxy. + * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. + * + *

+ * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes */ - abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest request, - Consumer> callback); + abstract void handleForwardedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback); - /** - * Replay a request originating in this proxy to a successor local proxy. - */ - abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest request, - Consumer> callback); + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString(); + } }