/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Optional;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
-import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
-import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
-import org.opendaylight.controller.cluster.access.commands.TransactionModification;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.concepts.Identifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Frontend transaction state as observed by the shard leader.
+ * Frontend common transaction state as observed by the shard leader.
*
* @author Robert Varga
*/
@NotThreadSafe
-final class FrontendTransaction {
+abstract class FrontendTransaction implements Identifiable<TransactionIdentifier> {
private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
private final AbstractFrontendHistory history;
private Long lastPurgedSequence;
private long expectedSequence;
- private ReadWriteShardDataTreeTransaction openTransaction;
- private ModifyTransactionSuccess cachedModifySuccess;
- private DataTreeModification sealedModification;
- private ShardDataTreeCohort readyCohort;
+ private RequestException previousFailure;
- private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
- final ReadWriteShardDataTreeTransaction transaction) {
+ FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) {
this.history = Preconditions.checkNotNull(history);
this.id = Preconditions.checkNotNull(id);
- this.openTransaction = Preconditions.checkNotNull(transaction);
}
- private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
- final DataTreeModification mod) {
- this.history = Preconditions.checkNotNull(history);
- this.id = Preconditions.checkNotNull(id);
- this.sealedModification = Preconditions.checkNotNull(mod);
+ @Override
+ public final TransactionIdentifier getIdentifier() {
+ return id;
}
- static FrontendTransaction createOpen(final AbstractFrontendHistory history,
- final ReadWriteShardDataTreeTransaction transaction) {
- return new FrontendTransaction(history, transaction.getIdentifier(), transaction);
+ final AbstractFrontendHistory history() {
+ return history;
}
- static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id,
- final DataTreeModification mod) {
- return new FrontendTransaction(history, id, mod);
+ final String persistenceId() {
+ return history().persistenceId();
}
- java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
+ final java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
// Fast path check: if the requested sequence is the next request, bail early
if (expectedSequence == sequence) {
return java.util.Optional.empty();
return java.util.Optional.empty();
}
- void purgeSequencesUpTo(final long sequence) {
+ final void purgeSequencesUpTo(final long sequence) {
// FIXME: implement this
lastPurgedSequence = sequence;
}
- // Sequence has already been checked
- @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
- throws RequestException {
- if (request instanceof ModifyTransactionRequest) {
- return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
- } else if (request instanceof CommitLocalTransactionRequest) {
- handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
- return null;
- } else if (request instanceof ExistsTransactionRequest) {
- return handleExistsTransaction((ExistsTransactionRequest) request);
- } else if (request instanceof ReadTransactionRequest) {
- return handleReadTransaction((ReadTransactionRequest) request);
- } else if (request instanceof TransactionPreCommitRequest) {
- handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
- return null;
- } else if (request instanceof TransactionDoCommitRequest) {
- handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
- return null;
- } else if (request instanceof TransactionAbortRequest) {
- handleTransactionAbort((TransactionAbortRequest) request, envelope);
- return null;
- } else {
- throw new UnsupportedRequestException(request);
+ // Request order has already been checked by caller and replaySequence()
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ @Nullable
+ final TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
+ if (request instanceof IncrementTransactionSequenceRequest) {
+ final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request;
+ expectedSequence += incr.getIncrement();
+
+ return recordSuccess(incr.getSequence(),
+ new IncrementTransactionSequenceSuccess(incr.getTarget(), incr.getSequence()));
+ }
+
+ if (previousFailure != null) {
+ LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure);
+ throw previousFailure;
+ }
+
+ try {
+ return doHandleRequest(request, envelope, now);
+ } catch (RuntimeException e) {
+ /*
+ * The request failed to process, we should not attempt to ever
+ * apply it again. Furthermore we cannot accept any further requests
+ * from this connection, simply because the transaction state is
+ * undefined.
+ */
+ LOG.debug("{}: Request {} failed to process", persistenceId(), request, e);
+ previousFailure = new RuntimeRequestException("Request " + request + " failed to process", e);
+ throw previousFailure;
}
}
+ @Nullable
+ abstract TransactionSuccess<?> doHandleRequest(TransactionRequest<?> request, RequestEnvelope envelope,
+ long now) throws RequestException;
+
private void recordResponse(final long sequence, final Object response) {
if (replayQueue.isEmpty()) {
firstReplaySequence = sequence;
expectedSequence++;
}
- private <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
+ final <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
recordResponse(sequence, success);
return success;
}
- private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
- recordResponse(success.getSequence(), success);
- envelope.sendSuccess(success);
- }
-
- private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
- recordResponse(envelope.getMessage().getSequence(), failure);
- envelope.sendFailure(failure);
+ private long executionTime(final long startTime) {
+ return history.readTime() - startTime;
}
- private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
- final RequestEnvelope envelope) throws RequestException {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
- request.getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
- throws RequestException {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
- throws RequestException {
- readyCohort.abort(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- readyCohort = null;
- recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
- LOG.debug("Transaction {} aborted", id);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- readyCohort = null;
- LOG.warn("Transaction {} abort failed", id, failure);
- recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
- }
- });
- }
-
- private void coordinatedCommit(final RequestEnvelope envelope) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void directCommit(final RequestEnvelope envelope) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
-
- }
-
- private void successfulDirectCanCommit(final RequestEnvelope envelope) {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- successfulDirectPreCommit(envelope);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulDirectPreCommit(final RequestEnvelope envelope) {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulCommit(final RequestEnvelope envelope) {
- recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- readyCohort = null;
- }
-
- private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
- if (sealedModification.equals(request.getModification())) {
- readyCohort = history.createReadyCohort(id, sealedModification);
-
- if (request.isCoordinated()) {
- coordinatedCommit(envelope);
- } else {
- directCommit(envelope);
- }
- } else {
- throw new UnsupportedRequestException(request);
- }
- }
-
- private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
- throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
- data.isPresent()));
- }
-
- private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
+ final void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+ final TransactionSuccess<?> success) {
+ recordResponse(success.getSequence(), success);
+ envelope.sendSuccess(success, executionTime(startTime));
}
- private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
- if (cachedModifySuccess == null) {
- cachedModifySuccess = new ModifyTransactionSuccess(id, sequence);
- }
-
- return recordSuccess(sequence, cachedModifySuccess);
+ final void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+ final RuntimeRequestException failure) {
+ recordResponse(envelope.getMessage().getSequence(), failure);
+ envelope.sendFailure(failure, executionTime(startTime));
}
- private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
-
- final DataTreeModification modification = openTransaction.getSnapshot();
- for (TransactionModification m : request.getModifications()) {
- if (m instanceof TransactionDelete) {
- modification.delete(m.getPath());
- } else if (m instanceof TransactionWrite) {
- modification.write(m.getPath(), ((TransactionWrite) m).getData());
- } else if (m instanceof TransactionMerge) {
- modification.merge(m.getPath(), ((TransactionMerge) m).getData());
- } else {
- LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m);
- }
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
- if (!maybeProto.isPresent()) {
- return replyModifySuccess(request.getSequence());
- }
-
- switch (maybeProto.get()) {
- case ABORT:
- openTransaction.abort();
- openTransaction = null;
- return replyModifySuccess(request.getSequence());
- case SIMPLE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
- directCommit(envelope);
- return null;
- case THREE_PHASE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
- coordinatedCommit(envelope);
- return null;
- default:
- throw new UnsupportedRequestException(request);
- }
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
+ .add("expectedSequence", expectedSequence).add("firstReplaySequence", firstReplaySequence)
+ .add("lastPurgedSequence", lastPurgedSequence)
+ .toString();
}
}