X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendTransaction.java;h=94093195c9b753e9f48646ce313de89f046ede58;hb=b5cb353e3553a39f576c284119af75ffa5ea66a9;hp=a825a4ddee58842318270aaa94269e7c2b37e5bf;hpb=8fdce17243f8d71a053704e93cd5fe43fefc1038;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index a825a4ddee..94093195c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,56 +7,34 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.primitives.UnsignedLong; -import com.google.common.util.concurrent.FutureCallback; import java.util.ArrayDeque; import java.util.Iterator; import java.util.Queue; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; -import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; -import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionDelete; -import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionMerge; -import org.opendaylight.controller.cluster.access.commands.TransactionModification; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Frontend transaction state as observed by the shard leader. + * Frontend common transaction state as observed by the shard leader. * * @author Robert Varga */ @NotThreadSafe -final class FrontendTransaction { +abstract class FrontendTransaction implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class); private final AbstractFrontendHistory history; @@ -72,36 +50,27 @@ final class FrontendTransaction { private Long lastPurgedSequence; private long expectedSequence; - private ReadWriteShardDataTreeTransaction openTransaction; - private ModifyTransactionSuccess cachedModifySuccess; - private DataTreeModification sealedModification; - private ShardDataTreeCohort readyCohort; + private RequestException previousFailure; - private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, - final ReadWriteShardDataTreeTransaction transaction) { + FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) { this.history = Preconditions.checkNotNull(history); this.id = Preconditions.checkNotNull(id); - this.openTransaction = Preconditions.checkNotNull(transaction); } - private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, - final DataTreeModification mod) { - this.history = Preconditions.checkNotNull(history); - this.id = Preconditions.checkNotNull(id); - this.sealedModification = Preconditions.checkNotNull(mod); + @Override + public final TransactionIdentifier getIdentifier() { + return id; } - static FrontendTransaction createOpen(final AbstractFrontendHistory history, - final ReadWriteShardDataTreeTransaction transaction) { - return new FrontendTransaction(history, transaction.getIdentifier(), transaction); + final AbstractFrontendHistory history() { + return history; } - static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id, - final DataTreeModification mod) { - return new FrontendTransaction(history, id, mod); + final String persistenceId() { + return history().persistenceId(); } - java.util.Optional> replaySequence(final long sequence) throws RequestException { + final java.util.Optional> replaySequence(final long sequence) throws RequestException { // Fast path check: if the requested sequence is the next request, bail early if (expectedSequence == sequence) { return java.util.Optional.empty(); @@ -145,37 +114,49 @@ final class FrontendTransaction { return java.util.Optional.empty(); } - void purgeSequencesUpTo(final long sequence) { + final void purgeSequencesUpTo(final long sequence) { // FIXME: implement this lastPurgedSequence = sequence; } - // Sequence has already been checked - @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + // Request order has already been checked by caller and replaySequence() + @SuppressWarnings("checkstyle:IllegalCatch") + @Nullable + final TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - if (request instanceof ModifyTransactionRequest) { - return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); - } else if (request instanceof CommitLocalTransactionRequest) { - handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now); - return null; - } else if (request instanceof ExistsTransactionRequest) { - return handleExistsTransaction((ExistsTransactionRequest) request); - } else if (request instanceof ReadTransactionRequest) { - return handleReadTransaction((ReadTransactionRequest) request); - } else if (request instanceof TransactionPreCommitRequest) { - handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now); - return null; - } else if (request instanceof TransactionDoCommitRequest) { - handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); - return null; - } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); - } else { - throw new UnsupportedRequestException(request); + if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request; + expectedSequence += incr.getIncrement(); + + return recordSuccess(incr.getSequence(), + new IncrementTransactionSequenceSuccess(incr.getTarget(), incr.getSequence())); + } + + if (previousFailure != null) { + LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure); + throw previousFailure; + } + + try { + return doHandleRequest(request, envelope, now); + } catch (RuntimeException e) { + /* + * The request failed to process, we should not attempt to ever + * apply it again. Furthermore we cannot accept any further requests + * from this connection, simply because the transaction state is + * undefined. + */ + LOG.debug("{}: Request {} failed to process", persistenceId(), request, e); + previousFailure = new RuntimeRequestException("Request " + request + " failed to process", e); + throw previousFailure; } } + @Nullable + abstract TransactionSuccess doHandleRequest(TransactionRequest request, RequestEnvelope envelope, + long now) throws RequestException; + private void recordResponse(final long sequence, final Object response) { if (replayQueue.isEmpty()) { firstReplaySequence = sequence; @@ -184,7 +165,7 @@ final class FrontendTransaction { expectedSequence++; } - private > T recordSuccess(final long sequence, final T success) { + final > T recordSuccess(final long sequence, final T success) { recordResponse(sequence, success); return success; } @@ -193,219 +174,23 @@ final class FrontendTransaction { return history.readTime() - startTime; } - private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime, + final void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime, final TransactionSuccess success) { recordResponse(success.getSequence(), success); envelope.sendSuccess(success, executionTime(startTime)); } - private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime, + final void recordAndSendFailure(final RequestEnvelope envelope, final long startTime, final RuntimeRequestException failure) { recordResponse(envelope.getMessage().getSequence(), failure); envelope.sendFailure(failure, executionTime(startTime)); } - private void handleTransactionPreCommit(final TransactionPreCommitRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - readyCohort.preCommit(new FutureCallback() { - @Override - public void onSuccess(final DataTreeCandidate result) { - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), - request.getSequence())); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure)); - readyCohort = null; - } - }); - } - - private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope, - final long now) throws RequestException { - readyCohort.commit(new FutureCallback() { - @Override - public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope, now); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure)); - readyCohort = null; - } - }); - } - - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - if (readyCohort == null) { - openTransaction.abort(); - return new TransactionAbortSuccess(id, request.getSequence()); - } - - readyCohort.abort(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - readyCohort = null; - recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence())); - LOG.debug("Transaction {} aborted", id); - } - - @Override - public void onFailure(final Throwable failure) { - readyCohort = null; - LOG.warn("Transaction {} abort failed", id, failure); - recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); - } - }); - return null; - } - - private void coordinatedCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), - envelope.getMessage().getSequence())); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void directCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - successfulDirectCanCommit(envelope, now); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); - - } - - private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.preCommit(new FutureCallback() { - @Override - public void onSuccess(final DataTreeCandidate result) { - successfulDirectPreCommit(envelope, startTime); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.commit(new FutureCallback() { - - @Override - public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope, startTime); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void successfulCommit(final RequestEnvelope envelope, final long startTime) { - recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), - envelope.getMessage().getSequence())); - readyCohort = null; - } - - private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - if (sealedModification.equals(request.getModification())) { - readyCohort = history.createReadyCohort(id, sealedModification); - - if (request.isCoordinated()) { - coordinatedCommit(envelope, now); - } else { - directCommit(envelope, now); - } - } else { - throw new UnsupportedRequestException(request); - } - } - - private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); - return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(), - data.isPresent())); - } - - private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); - return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data)); - } - - private ModifyTransactionSuccess replyModifySuccess(final long sequence) { - if (cachedModifySuccess == null) { - cachedModifySuccess = new ModifyTransactionSuccess(id, sequence); - } - - return recordSuccess(sequence, cachedModifySuccess); - } - - private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - - final DataTreeModification modification = openTransaction.getSnapshot(); - for (TransactionModification m : request.getModifications()) { - if (m instanceof TransactionDelete) { - modification.delete(m.getPath()); - } else if (m instanceof TransactionWrite) { - modification.write(m.getPath(), ((TransactionWrite) m).getData()); - } else if (m instanceof TransactionMerge) { - modification.merge(m.getPath(), ((TransactionMerge) m).getData()); - } else { - LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m); - } - } - - final java.util.Optional maybeProto = request.getPersistenceProtocol(); - if (!maybeProto.isPresent()) { - return replyModifySuccess(request.getSequence()); - } - - switch (maybeProto.get()) { - case ABORT: - openTransaction.abort(); - openTransaction = null; - return replyModifySuccess(request.getSequence()); - case SIMPLE: - readyCohort = openTransaction.ready(); - openTransaction = null; - directCommit(envelope, now); - return null; - case THREE_PHASE: - readyCohort = openTransaction.ready(); - openTransaction = null; - coordinatedCommit(envelope, now); - return null; - default: - throw new UnsupportedRequestException(request); - } + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier()) + .add("expectedSequence", expectedSequence).add("firstReplaySequence", firstReplaySequence) + .add("lastPurgedSequence", lastPurgedSequence) + .toString(); } }