X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendTransaction.java;h=8846467b59d2d924f783014215d947551a498f21;hb=583f30d1c7a8199b401c9393745c62fe27b5ced8;hp=9240aab26c22cb8402941d01a829ef68a6855d56;hpb=b00bee7547dbba0677347e991a8674f90752f6a2;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index 9240aab26c..8846467b59 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -7,58 +7,30 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.primitives.UnsignedLong; -import com.google.common.util.concurrent.FutureCallback; import java.util.ArrayDeque; import java.util.Iterator; import java.util.Queue; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; -import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; -import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionDelete; -import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionMerge; -import org.opendaylight.controller.cluster.access.commands.TransactionModification; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.concepts.Identifiable; /** - * Frontend transaction state as observed by the shard leader. + * Frontend common transaction state as observed by the shard leader. * * @author Robert Varga */ @NotThreadSafe -final class FrontendTransaction { - private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class); - +abstract class FrontendTransaction implements Identifiable { private final AbstractFrontendHistory history; private final TransactionIdentifier id; @@ -72,35 +44,21 @@ final class FrontendTransaction { private Long lastPurgedSequence; private long expectedSequence; - private ReadWriteShardDataTreeTransaction openTransaction; - private DataTreeModification sealedModification; - private ShardDataTreeCohort readyCohort; - - private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, - final ReadWriteShardDataTreeTransaction transaction) { + FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) { this.history = Preconditions.checkNotNull(history); this.id = Preconditions.checkNotNull(id); - this.openTransaction = Preconditions.checkNotNull(transaction); } - private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, - final DataTreeModification mod) { - this.history = Preconditions.checkNotNull(history); - this.id = Preconditions.checkNotNull(id); - this.sealedModification = Preconditions.checkNotNull(mod); + @Override + public final TransactionIdentifier getIdentifier() { + return id; } - static FrontendTransaction createOpen(final AbstractFrontendHistory history, - final ReadWriteShardDataTreeTransaction transaction) { - return new FrontendTransaction(history, transaction.getIdentifier(), transaction); + final AbstractFrontendHistory history() { + return history; } - static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id, - final DataTreeModification mod) { - return new FrontendTransaction(history, id, mod); - } - - java.util.Optional> replaySequence(final long sequence) throws RequestException { + final java.util.Optional> replaySequence(final long sequence) throws RequestException { // Fast path check: if the requested sequence is the next request, bail early if (expectedSequence == sequence) { return java.util.Optional.empty(); @@ -144,36 +102,15 @@ final class FrontendTransaction { return java.util.Optional.empty(); } - void purgeSequencesUpTo(final long sequence) { + final void purgeSequencesUpTo(final long sequence) { // FIXME: implement this lastPurgedSequence = sequence; } // Sequence has already been checked - @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, - final long now) throws RequestException { - if (request instanceof ModifyTransactionRequest) { - return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); - } else if (request instanceof CommitLocalTransactionRequest) { - handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now); - return null; - } else if (request instanceof ExistsTransactionRequest) { - return handleExistsTransaction((ExistsTransactionRequest) request); - } else if (request instanceof ReadTransactionRequest) { - return handleReadTransaction((ReadTransactionRequest) request); - } else if (request instanceof TransactionPreCommitRequest) { - handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now); - return null; - } else if (request instanceof TransactionDoCommitRequest) { - handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); - return null; - } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); - } else { - throw new UnsupportedRequestException(request); - } - } + abstract @Nullable TransactionSuccess handleRequest(TransactionRequest request, + RequestEnvelope envelope, long now) throws RequestException; private void recordResponse(final long sequence, final Object response) { if (replayQueue.isEmpty()) { @@ -183,7 +120,7 @@ final class FrontendTransaction { expectedSequence++; } - private > T recordSuccess(final long sequence, final T success) { + final > T recordSuccess(final long sequence, final T success) { recordResponse(sequence, success); return success; } @@ -192,215 +129,24 @@ final class FrontendTransaction { return history.readTime() - startTime; } - private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime, + final void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime, final TransactionSuccess success) { recordResponse(success.getSequence(), success); envelope.sendSuccess(success, executionTime(startTime)); } - private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime, + final void recordAndSendFailure(final RequestEnvelope envelope, final long startTime, final RuntimeRequestException failure) { recordResponse(envelope.getMessage().getSequence(), failure); envelope.sendFailure(failure, executionTime(startTime)); } - private void handleTransactionPreCommit(final TransactionPreCommitRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - readyCohort.preCommit(new FutureCallback() { - @Override - public void onSuccess(final DataTreeCandidate result) { - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), - request.getSequence())); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure)); - readyCohort = null; - } - }); - } - - private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope, - final long now) throws RequestException { - readyCohort.commit(new FutureCallback() { - @Override - public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope, now); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure)); - readyCohort = null; - } - }); - } - - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - if (readyCohort == null) { - openTransaction.abort(); - return new TransactionAbortSuccess(id, request.getSequence()); - } - - readyCohort.abort(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - readyCohort = null; - recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence())); - LOG.debug("Transaction {} aborted", id); - } - - @Override - public void onFailure(final Throwable failure) { - readyCohort = null; - LOG.warn("Transaction {} abort failed", id, failure); - recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); - } - }); - return null; - } - - private void coordinatedCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), - envelope.getMessage().getSequence())); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void directCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - successfulDirectCanCommit(envelope, now); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); - - } - - private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.preCommit(new FutureCallback() { - @Override - public void onSuccess(final DataTreeCandidate result) { - successfulDirectPreCommit(envelope, startTime); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure)); - readyCohort = null; - } - }); + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier()) + .add("expectedSequence", expectedSequence).add("firstReplaySequence", firstReplaySequence) + .add("lastPurgedSequence", lastPurgedSequence) + .toString(); } - private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.commit(new FutureCallback() { - - @Override - public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope, startTime); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void successfulCommit(final RequestEnvelope envelope, final long startTime) { - recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), - envelope.getMessage().getSequence())); - readyCohort = null; - } - - private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - if (sealedModification.equals(request.getModification())) { - readyCohort = history.createReadyCohort(id, sealedModification); - - if (request.isCoordinated()) { - coordinatedCommit(envelope, now); - } else { - directCommit(envelope, now); - } - } else { - throw new UnsupportedRequestException(request); - } - } - - private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); - return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(), - data.isPresent())); - } - - private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); - return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data)); - } - - private ModifyTransactionSuccess replyModifySuccess(final long sequence) { - return recordSuccess(sequence, new ModifyTransactionSuccess(id, sequence)); - } - - private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - - final DataTreeModification modification = openTransaction.getSnapshot(); - for (TransactionModification m : request.getModifications()) { - if (m instanceof TransactionDelete) { - modification.delete(m.getPath()); - } else if (m instanceof TransactionWrite) { - modification.write(m.getPath(), ((TransactionWrite) m).getData()); - } else if (m instanceof TransactionMerge) { - modification.merge(m.getPath(), ((TransactionMerge) m).getData()); - } else { - LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m); - } - } - - final java.util.Optional maybeProto = request.getPersistenceProtocol(); - if (!maybeProto.isPresent()) { - return replyModifySuccess(request.getSequence()); - } - - switch (maybeProto.get()) { - case ABORT: - openTransaction.abort(); - openTransaction = null; - return replyModifySuccess(request.getSequence()); - case SIMPLE: - readyCohort = openTransaction.ready(); - openTransaction = null; - directCommit(envelope, now); - return null; - case THREE_PHASE: - readyCohort = openTransaction.ready(); - openTransaction = null; - coordinatedCommit(envelope, now); - return null; - default: - throw new UnsupportedRequestException(request); - } - } }