--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request to create a transaction which has
+ * already been purged.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DeadTransactionException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ public DeadTransactionException(final long lastSeenTransaction) {
+ super("Transaction up to " + Long.toUnsignedString(lastSeenTransaction) + " are accounted for");
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return true;
+ }
+}
import akka.actor.ActorRef;
import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
super(target, sequence, replyTo);
+ Preconditions.checkArgument(target.getHistoryId() != 0, "History identifier must be non-zero");
}
LocalHistoryRequest(final T request, final ABIVersion version) {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Response to a {@link ModifyTransactionRequest} which does not have a {@link PersistenceProtocol}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ModifyTransactionSuccess extends TransactionSuccess<ModifyTransactionSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public ModifyTransactionSuccess(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
+ }
+
+ private ModifyTransactionSuccess(final ModifyTransactionSuccess success, final ABIVersion version) {
+ super(success, version);
+ }
+
+ @Override
+ protected AbstractTransactionSuccessProxy<ModifyTransactionSuccess> externalizableProxy(final ABIVersion version) {
+ return new ModifyTransactionSuccessProxyV1(this);
+ }
+
+ @Override
+ protected ModifyTransactionSuccess cloneAsVersion(final ABIVersion version) {
+ return new ModifyTransactionSuccess(this, version);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link ModifyTransactionSuccess}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class ModifyTransactionSuccessProxyV1 extends AbstractTransactionSuccessProxy<ModifyTransactionSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public ModifyTransactionSuccessProxyV1() {
+ // For Externalizable
+ }
+
+ ModifyTransactionSuccessProxyV1(final ModifyTransactionSuccess success) {
+ super(success);
+ }
+
+ @Override
+ protected ModifyTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new ModifyTransactionSuccess(target, sequence);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * General error raised when the recipient of a Request is not the correct backend to talk to. This typically
+ * means that the backend processing has moved and the frontend needs to run rediscovery and retry the request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class NotLeaderException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ public NotLeaderException(final ActorRef me) {
+ super("Actor " + me + " is not the current leader");
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received an unexpected request. This would typically
+ * mean that messages are not arriving in the sequence they were generated by the frontend.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class OutOfOrderRequestException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ public OutOfOrderRequestException(final long expectedRequest) {
+ super("Expecting request " + Long.toUnsignedString(expectedRequest));
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return true;
+ }
+}
super(identifier, sequence);
}
+ TransactionSuccess(final T success, final ABIVersion version) {
+ super(success, version);
+ }
+
@Override
protected abstract AbstractTransactionSuccessProxy<T> externalizableProxy(ABIVersion version);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request referencing an unknown history. This
+ * typically happens when the linear history ID is newer than the highest observed {@link CreateLocalHistoryRequest}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class UnknownHistoryException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ public UnknownHistoryException(final Long lastSeenHistory) {
+ super("Last known history is " + historyToString(lastSeenHistory));
+ }
+
+ private static String historyToString(final Long history) {
+ return history == null ? "null" : Long.toUnsignedString(history.longValue());
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return true;
+ }
+}
super(Preconditions.checkNotNull(message));
}
- protected RequestException(@Nonnull final String message, @Nonnull final Exception cause) {
+ protected RequestException(@Nonnull final String message, @Nonnull final Throwable cause) {
super(Preconditions.checkNotNull(message), Preconditions.checkNotNull(cause));
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * General error raised when the recipient of a {@link Request} fails to process a request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class RuntimeRequestException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ public RuntimeRequestException(final String message, final Throwable cause) {
+ super(message, cause);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(message), "Exception message is mandatory");
+ Preconditions.checkNotNull(cause);
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * General error raised when the recipient of a {@link Request} determines that it does not know how to handle
+ * the request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class UnsupportedRequestException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedRequestException(final Request<?, ?> request) {
+ super("Unsupported request " + request.getClass());
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class for providing logical tracking of frontend local histories. This class is specialized for
+ * standalone transactions and chained transactions.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class);
+ private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0);
+
+ private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
+ private final String persistenceId;
+
+ AbstractFrontendHistory(final String persistenceId) {
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ }
+
+ final String persistenceId() {
+ return persistenceId;
+ }
+
+ final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+ final RequestEnvelope envelope) throws RequestException {
+
+ // FIXME: handle purging of transactions
+
+ final TransactionIdentifier id = request.getTarget();
+ FrontendTransaction tx = transactions.get(id);
+ if (tx == null) {
+ // The transaction does not exist and we are about to create it, check sequence number
+ if (request.getSequence() != 0) {
+ LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+ throw UNSEQUENCED_START;
+ }
+
+ if (request instanceof CommitLocalTransactionRequest) {
+ tx = createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
+ LOG.debug("{}: allocated new ready transaction {}", persistenceId(), id);
+ } else {
+ tx = createOpenTransaction(id);
+ LOG.debug("{}: allocated new open transaction {}", persistenceId(), id);
+ }
+
+ transactions.put(id, tx);
+ } else {
+ final Optional<TransactionSuccess<?>> replay = tx.replaySequence(request.getSequence());
+ if (replay.isPresent()) {
+ return replay.get();
+ }
+ }
+
+ return tx.handleRequest(request, envelope);
+ }
+
+ abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
+
+ abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
+ throws RequestException;
+
+ abstract ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod);
+}
import com.google.common.base.Preconditions;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
/**
* @param <T> Backing transaction type.
*/
@NotThreadSafe
-abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot> {
- private final T snapshot;
+abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
+ implements Identifiable<TransactionIdentifier> {
private final TransactionIdentifier id;
+ private final T snapshot;
+
private boolean closed;
- protected AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
+ AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
this.snapshot = Preconditions.checkNotNull(snapshot);
this.id = Preconditions.checkNotNull(id);
}
- final TransactionIdentifier getId() {
+ @Override
+ public final TransactionIdentifier getIdentifier() {
return id;
}
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
}
@Override
- public ListenableFuture<Void> abort() {
- return delegate.abort();
+ public void abort(final FutureCallback<Void> callback) {
+ delegate.abort(callback);
}
@Override
public State getState() {
return delegate.getState();
}
+
}
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionId = transaction.getId();
+ this.transactionId = transaction.getIdentifier();
this.clientVersion = clientVersion;
}
cohort.commit(callback);
}
- void abort() throws InterruptedException, ExecutionException, TimeoutException {
- cohort.abort().get();
+ void abort(final FutureCallback<Void> callback) {
+ cohort.abort(callback);
}
void ready(final CohortDecorator cohortDecorator) {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Queue;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend transaction state as observed by the shard leader.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class FrontendTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
+
+ private final AbstractFrontendHistory history;
+ private final TransactionIdentifier id;
+
+ /**
+ * It is possible that after we process a request and send a response that response gets lost and the client
+ * initiates a retry. Since subsequent requests can mutate transaction state we need to retain the response until
+ * it is acknowledged by the client.
+ */
+ private final Queue<Object> replayQueue = new ArrayDeque<>();
+ private long firstReplaySequence;
+ private Long lastPurgedSequence;
+ private long expectedSequence;
+
+ private ReadWriteShardDataTreeTransaction openTransaction;
+ private ModifyTransactionSuccess cachedModifySuccess;
+ private DataTreeModification sealedModification;
+ private ShardDataTreeCohort readyCohort;
+
+ private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+ final ReadWriteShardDataTreeTransaction transaction) {
+ this.history = Preconditions.checkNotNull(history);
+ this.id = Preconditions.checkNotNull(id);
+ this.openTransaction = Preconditions.checkNotNull(transaction);
+ }
+
+ private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+ final DataTreeModification mod) {
+ this.history = Preconditions.checkNotNull(history);
+ this.id = Preconditions.checkNotNull(id);
+ this.sealedModification = Preconditions.checkNotNull(mod);
+ }
+
+ static FrontendTransaction createOpen(final AbstractFrontendHistory history,
+ final ReadWriteShardDataTreeTransaction transaction) {
+ return new FrontendTransaction(history, transaction.getIdentifier(), transaction);
+ }
+
+ static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id,
+ final DataTreeModification mod) {
+ return new FrontendTransaction(history, id, mod);
+ }
+
+ java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
+ // Fast path check: if the requested sequence is the next request, bail early
+ if (expectedSequence == sequence) {
+ return java.util.Optional.empty();
+ }
+
+ // Check sequencing: we do not need to bother with future requests
+ if (Long.compareUnsigned(expectedSequence, sequence) < 0) {
+ throw new OutOfOrderRequestException(expectedSequence);
+ }
+
+ // Sanity check: if we have purged sequences, this has to be newer
+ if (lastPurgedSequence != null && Long.compareUnsigned(lastPurgedSequence.longValue(), sequence) >= 0) {
+ // Client has sent a request sequence, which has already been purged. This is a hard error, which should
+ // never occur. Throwing an IllegalArgumentException will cause it to be wrapped in a
+ // RuntimeRequestException (which is not retriable) and report it back to the client.
+ throw new IllegalArgumentException(String.format("Invalid purged sequence %s (last purged is %s)",
+ sequence, lastPurgedSequence));
+ }
+
+ // At this point we have established that the requested sequence lies in the open interval
+ // (lastPurgedSequence, expectedSequence). That does not actually mean we have a response, as the commit
+ // machinery is asynchronous, hence a reply may be in the works and not available.
+
+ long replaySequence = firstReplaySequence;
+ final Iterator<?> it = replayQueue.iterator();
+ while (it.hasNext()) {
+ final Object replay = it.next();
+ if (replaySequence == sequence) {
+ if (replay instanceof RequestException) {
+ throw (RequestException) replay;
+ }
+
+ Verify.verify(replay instanceof TransactionSuccess);
+ return java.util.Optional.of((TransactionSuccess<?>) replay);
+ }
+
+ replaySequence++;
+ }
+
+ // Not found
+ return java.util.Optional.empty();
+ }
+
+ void purgeSequencesUpTo(final long sequence) {
+ // FIXME: implement this
+
+ lastPurgedSequence = sequence;
+ }
+
+ // Sequence has already been checked
+ @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
+ throws RequestException {
+ if (request instanceof ModifyTransactionRequest) {
+ return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
+ } else if (request instanceof CommitLocalTransactionRequest) {
+ handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
+ return null;
+ } else if (request instanceof ExistsTransactionRequest) {
+ return handleExistsTransaction((ExistsTransactionRequest) request);
+ } else if (request instanceof ReadTransactionRequest) {
+ return handleReadTransaction((ReadTransactionRequest) request);
+ } else if (request instanceof TransactionPreCommitRequest) {
+ handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
+ return null;
+ } else if (request instanceof TransactionDoCommitRequest) {
+ handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
+ return null;
+ } else if (request instanceof TransactionAbortRequest) {
+ handleTransactionAbort((TransactionAbortRequest) request, envelope);
+ return null;
+ } else {
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
+ private void recordResponse(final long sequence, final Object response) {
+ if (replayQueue.isEmpty()) {
+ firstReplaySequence = sequence;
+ }
+ replayQueue.add(response);
+ expectedSequence++;
+ }
+
+ private <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
+ recordResponse(sequence, success);
+ return success;
+ }
+
+ private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
+ recordResponse(success.getSequence(), success);
+ envelope.sendSuccess(success);
+ }
+
+ private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
+ recordResponse(envelope.getMessage().getSequence(), failure);
+ envelope.sendFailure(failure);
+ }
+
+ private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
+ final RequestEnvelope envelope) throws RequestException {
+ readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+ @Override
+ public void onSuccess(final DataTreeCandidate result) {
+ recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+ request.getSequence()));
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
+ throws RequestException {
+ readyCohort.commit(new FutureCallback<UnsignedLong>() {
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ successfulCommit(envelope);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
+ throws RequestException {
+ readyCohort.abort(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ readyCohort = null;
+ recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
+ LOG.debug("Transaction {} aborted", id);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ readyCohort = null;
+ LOG.warn("Transaction {} abort failed", id, failure);
+ recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
+ }
+ });
+ }
+
+ private void coordinatedCommit(final RequestEnvelope envelope) {
+ readyCohort.canCommit(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+ envelope.getMessage().getSequence()));
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void directCommit(final RequestEnvelope envelope) {
+ readyCohort.canCommit(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ successfulDirectCanCommit(envelope);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+
+ }
+
+ private void successfulDirectCanCommit(final RequestEnvelope envelope) {
+ readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+ @Override
+ public void onSuccess(final DataTreeCandidate result) {
+ successfulDirectPreCommit(envelope);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void successfulDirectPreCommit(final RequestEnvelope envelope) {
+ readyCohort.commit(new FutureCallback<UnsignedLong>() {
+
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ successfulCommit(envelope);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void successfulCommit(final RequestEnvelope envelope) {
+ recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+ envelope.getMessage().getSequence()));
+ readyCohort = null;
+ }
+
+ private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
+ final RequestEnvelope envelope) throws RequestException {
+ if (sealedModification.equals(request.getModification())) {
+ readyCohort = history.createReadyCohort(id, sealedModification);
+
+ if (request.isCoordinated()) {
+ coordinatedCommit(envelope);
+ } else {
+ directCommit(envelope);
+ }
+ } else {
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
+ private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
+ throws RequestException {
+ final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
+ data.isPresent()));
+ }
+
+ private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
+ final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
+ }
+
+ private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
+ if (cachedModifySuccess == null) {
+ cachedModifySuccess = new ModifyTransactionSuccess(id, sequence);
+ }
+
+ return recordSuccess(sequence, cachedModifySuccess);
+ }
+
+ private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+ final RequestEnvelope envelope) throws RequestException {
+
+ final DataTreeModification modification = openTransaction.getSnapshot();
+ for (TransactionModification m : request.getModifications()) {
+ if (m instanceof TransactionDelete) {
+ modification.delete(m.getPath());
+ } else if (m instanceof TransactionWrite) {
+ modification.write(m.getPath(), ((TransactionWrite) m).getData());
+ } else if (m instanceof TransactionMerge) {
+ modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+ } else {
+ LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m);
+ }
+ }
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
+ if (!maybeProto.isPresent()) {
+ return replyModifySuccess(request.getSequence());
+ }
+
+ switch (maybeProto.get()) {
+ case ABORT:
+ openTransaction.abort();
+ openTransaction = null;
+ return replyModifySuccess(request.getSequence());
+ case SIMPLE:
+ readyCohort = openTransaction.ready();
+ openTransaction = null;
+ directCommit(envelope);
+ return null;
+ case THREE_PHASE:
+ readyCohort = openTransaction.ready();
+ openTransaction = null;
+ coordinatedCommit(envelope);
+ return null;
+ default:
+ throw new UnsupportedRequestException(request);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
+ * in the frontend/backend conversation.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
+
+ // Histories which have not been purged
+ private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+
+ // RangeSet performs automatic merging, hence we keep minimal state tracking information
+ private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+
+ // Used for all standalone transactions
+ private final AbstractFrontendHistory standaloneHistory;
+ private final ShardDataTree tree;
+ private final ClientIdentifier clientId;
+ private final String persistenceId;
+
+ private long expectedTxSequence;
+ private Long lastSeenHistory = null;
+
+
+ // TODO: explicit failover notification
+ // Record the ActorRef for the originating actor and when we switch to being a leader send a notification
+ // to the frontend client -- that way it can immediately start sending requests
+
+ // TODO: add statistics:
+ // - number of requests processed
+ // - number of histories processed
+ // - per-RequestException throw counters
+
+ LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.clientId = Preconditions.checkNotNull(clientId);
+ this.tree = Preconditions.checkNotNull(tree);
+ standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+ }
+
+ @Override
+ public ClientIdentifier getIdentifier() {
+ return clientId;
+ }
+
+ private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException {
+ if (expectedTxSequence != envelope.getTxSequence()) {
+ throw new OutOfOrderRequestException(expectedTxSequence);
+ }
+ }
+
+ private void expectNextRequest() {
+ expectedTxSequence++;
+ }
+
+ @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+ final RequestEnvelope envelope) throws RequestException {
+ checkRequestSequence(envelope);
+
+ try {
+ if (request instanceof CreateLocalHistoryRequest) {
+ return handleCreateHistory((CreateLocalHistoryRequest) request);
+ } else if (request instanceof DestroyLocalHistoryRequest) {
+ return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+ } else if (request instanceof PurgeLocalHistoryRequest) {
+ return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+ } else {
+ throw new UnsupportedRequestException(request);
+ }
+ } finally {
+ expectNextRequest();
+ }
+ }
+
+ private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
+ final LocalHistoryIdentifier id = request.getTarget();
+ final AbstractFrontendHistory existing = localHistories.get(id);
+ if (existing != null) {
+ // History already exists: report success
+ LOG.debug("{}: history {} already exists", persistenceId, id);
+ return new LocalHistorySuccess(id, request.getSequence());
+ }
+
+ // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
+ // end up resurrecting a purged history.
+ if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+ LOG.debug("{}: rejecting purged request {}", persistenceId, request);
+ throw new DeadHistoryException(lastSeenHistory.longValue());
+ }
+
+ // Update last history we have seen
+ if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
+ lastSeenHistory = id.getHistoryId();
+ }
+
+ localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
+ LOG.debug("{}: created history {}", persistenceId, id);
+ return new LocalHistorySuccess(id, request.getSequence());
+ }
+
+ private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+ final LocalHistoryIdentifier id = request.getTarget();
+ final LocalFrontendHistory existing = localHistories.get(id);
+ if (existing == null) {
+ // History does not exist: report success
+ LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id);
+ return new LocalHistorySuccess(id, request.getSequence());
+ }
+
+ return existing.destroy(request.getSequence());
+ }
+
+ private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+ final LocalHistoryIdentifier id = request.getTarget();
+ final LocalFrontendHistory existing = localHistories.remove(id);
+ if (existing != null) {
+ purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+
+ if (!existing.isDestroyed()) {
+ LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
+ existing.destroy(request.getSequence());
+ }
+
+ // FIXME: record a PURGE tombstone in the journal
+
+ LOG.debug("{}: purged history {}", persistenceId, id);
+ } else {
+ LOG.debug("{}: history {} has already been purged", persistenceId, id);
+ }
+
+ return new LocalHistorySuccess(id, request.getSequence());
+ }
+
+ @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+ final RequestEnvelope envelope) throws RequestException {
+ checkRequestSequence(envelope);
+
+ try {
+ final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
+ final AbstractFrontendHistory history;
+
+ if (lhId.getHistoryId() != 0) {
+ history = localHistories.get(lhId);
+ if (history == null) {
+ LOG.debug("{}: rejecting unknown history request {}", persistenceId, request);
+ throw new UnknownHistoryException(lastSeenHistory);
+ }
+ } else {
+ history = standaloneHistory;
+ }
+
+ return history.handleTransactionRequest(request, envelope);
+ } finally {
+ expectNextRequest();
+ }
+ }
+
+ void reconnect() {
+ expectedTxSequence = 0;
+ }
+
+ void retire() {
+ // FIXME: flush all state
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId)
+ .add("purgedHistories", purgedHistories).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chained transaction specialization of {@link AbstractFrontendHistory}. It prevents concurrent open transactions.
+ *
+ * @author Robert Varga
+ */
+final class LocalFrontendHistory extends AbstractFrontendHistory {
+ private enum State {
+ OPEN,
+ CLOSED,
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
+
+ private final ShardDataTreeTransactionChain chain;
+
+ private Long lastSeenTransaction;
+ private State state = State.OPEN;
+
+ LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) {
+ super(persistenceId);
+ this.chain = Preconditions.checkNotNull(chain);
+ }
+
+ @Override
+ public LocalHistoryIdentifier getIdentifier() {
+ return chain.getIdentifier();
+ }
+
+ @Override
+ FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
+ checkDeadTransaction(id);
+ lastSeenTransaction = id.getTransactionId();
+ return FrontendTransaction.createOpen(this, chain.newReadWriteTransaction(id));
+ }
+
+ @Override
+ FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
+ throws RequestException {
+ checkDeadTransaction(id);
+ lastSeenTransaction = id.getTransactionId();
+ return FrontendTransaction.createReady(this, id, mod);
+ }
+
+ @Override
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
+ return chain.createReadyCohort(id, mod);
+ }
+
+ LocalHistorySuccess destroy(final long sequence) throws RequestException {
+ if (state != State.CLOSED) {
+ LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+
+ // FIXME: add any finalization as needed
+ state = State.CLOSED;
+ }
+
+ // FIXME: record a DESTROY tombstone in the journal
+ return new LocalHistorySuccess(getIdentifier(), sequence);
+ }
+
+ boolean isDestroyed() {
+ return state == State.CLOSED;
+ }
+
+ private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
+ // FIXME: check if this history is still open
+ // FIXME: check if the last transaction has been submitted
+
+ // Transaction identifiers within a local history have to have increasing IDs
+ if (lastSeenTransaction != null && Long.compareUnsigned(lastSeenTransaction, id.getTransactionId()) >= 0) {
+ throw new DeadTransactionException(lastSeenTransaction);
+ }
+ }
+}
public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
private final ShardDataTreeTransactionParent parent;
- protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
+ ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
final TransactionIdentifier id, final DataTreeModification modification) {
super(id, modification);
this.parent = Preconditions.checkNotNull(parent);
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
+import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
+ private static final Collection<ABIVersion> SUPPORTED_ABIVERSIONS;
+
+ static {
+ final ABIVersion[] values = ABIVersion.values();
+ final ABIVersion[] real = Arrays.copyOfRange(values, 1, values.length - 1);
+ SUPPORTED_ABIVERSIONS = ImmutableList.copyOf(real).reverse();
+ }
+
+ // FIXME: make this a dynamic property based on mailbox size and maximum number of clients
+ private static final int CLIENT_MAX_MESSAGES = 1000;
+
// The state of this Shard
private final ShardDataTree store;
private final ShardTransactionMessageRetrySupport messageRetrySupport;
private final FrontendMetadata frontendMetadata = new FrontendMetadata();
+ private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
@Override
protected void handleNonRaftCommand(final Object message) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
maybeError.get());
}
- if (CreateTransaction.isSerializedType(message)) {
+ if (message instanceof RequestEnvelope) {
+ final RequestEnvelope envelope = (RequestEnvelope)message;
+ try {
+ final RequestSuccess<?, ?> success = handleRequest(envelope);
+ if (success != null) {
+ envelope.sendSuccess(success);
+ }
+ } catch (RequestException e) {
+ LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ envelope.sendFailure(e);
+ } catch (Exception e) {
+ LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+ }
+ } else if (message instanceof ConnectClientRequest) {
+ handleConnectClient((ConnectClientRequest)message);
+ } else if (CreateTransaction.isSerializedType(message)) {
handleCreateTransaction(message);
} else if (message instanceof BatchedModifications) {
handleBatchedModifications((BatchedModifications)message);
}
}
+ // Acquire our frontend tracking handle and verify generation matches
+ private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
+ if (existing != null) {
+ final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
+ if (cmp == 0) {
+ return existing;
+ }
+ if (cmp > 0) {
+ LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
+ throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+ }
+
+ LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
+ existing.retire();
+ knownFrontends.remove(clientId.getFrontendId());
+ } else {
+ LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
+ }
+
+ final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), ret);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
+ return ret;
+ }
+
+ private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+ final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
+ for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
+ if (clientRange.contains(v)) {
+ return v;
+ }
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "No common version between backend versions %s and client versions %s", SUPPORTED_ABIVERSIONS,
+ clientRange));
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleConnectClient(final ConnectClientRequest message) {
+ try {
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+ throw new NotLeaderException(getSelf());
+ }
+
+ final ABIVersion selectedVersion = selectVersion(message);
+ final LeaderFrontendState frontend = getFrontend(message.getTarget());
+ frontend.reconnect();
+ message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
+ ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
+ ActorRef.noSender());
+ } catch (RequestException | RuntimeException e) {
+ message.getReplyTo().tell(new Failure(e), ActorRef.noSender());
+ }
+ }
+
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+ // We are not the leader, hence we want to fail-fast.
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+ throw new NotLeaderException(getSelf());
+ }
+
+ final Request<?, ?> request = envelope.getMessage();
+ if (request instanceof TransactionRequest) {
+ final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
+ final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+ } else if (request instanceof LocalHistoryRequest) {
+ final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
+ final ClientIdentifier clientId = lhReq.getTarget().getClientId();
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+ } else {
+ LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
private boolean hasLeader() {
return getLeaderId() != null;
}
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionId(), e);
- sender.tell(new akka.actor.Status.Failure(e), getSelf());
+ sender.tell(new Failure(e), getSelf());
}
}
private boolean failIfIsolatedLeader(final ActorRef sender) {
if (isIsolatedLeader()) {
- sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ sender.tell(new Failure(new NoShardLeaderException(String.format(
"Shard %s was the leader but has lost contact with all of its followers. Either all"
+ " other follower nodes are down or this node is isolated by a network partition.",
persistenceId()))), getSelf());
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionId(), e);
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ getSender().tell(new Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+ getSender().tell(new Failure(new NoShardLeaderException(
"Could not create a shard transaction", persistenceId())), getSelf());
}
}
getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
} catch (Exception e) {
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ getSender().tell(new Failure(e), getSelf());
}
}
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
log.debug("{}: Aborting transaction {}", name, transactionID);
final ActorRef self = shard.getSelf();
- try {
- cohortEntry.abort();
+ cohortEntry.abort(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ if (sender != null) {
+ sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ }
+ }
- shard.getShardMBean().incrementAbortTransactionsCount();
+ @Override
+ public void onFailure(final Throwable failure) {
+ log.error("{}: An exception happened during abort", name, failure);
- if (sender != null) {
- sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ if (sender != null) {
+ sender.tell(new Failure(failure), self);
+ }
}
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("{}: An exception happened during abort", name, e);
+ });
- if (sender != null) {
- sender.tell(new Failure(e), self);
- }
- }
+ shard.getShardMBean().incrementAbortTransactionsCount();
}
void checkForExpiredTransactions(final long timeout, final Shard shard) {
}
}
- private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return createReadyCohort(transaction.getId(), snapshot);
+ return createReadyCohort(transaction.getIdentifier(), snapshot);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
cohortRegistry.process(sender, message);
}
+ @Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
final DataTreeModification modification) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
public abstract void preCommit(FutureCallback<DataTreeCandidate> callback);
@VisibleForTesting
- public abstract ListenableFuture<Void> abort();
+ public abstract void abort(FutureCallback<Void> callback);
@VisibleForTesting
public abstract void commit(FutureCallback<UnsignedLong> callback);
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A transaction chain attached to a Shard.
*/
@NotThreadSafe
-final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent {
+final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
+ implements Identifiable<LocalHistoryIdentifier> {
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
- private final ShardDataTree dataTree;
private final LocalHistoryIdentifier chainId;
+ private final ShardDataTree dataTree;
private ReadWriteShardDataTreeTransaction previousTx;
private ReadWriteShardDataTreeTransaction openTransaction;
private boolean closed;
ShardDataTreeTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier, final ShardDataTree dataTree) {
- this.dataTree = Preconditions.checkNotNull(dataTree);
this.chainId = Preconditions.checkNotNull(localHistoryIdentifier);
+ this.dataTree = Preconditions.checkNotNull(dataTree);
}
private DataTreeSnapshot getSnapshot() {
return MoreObjects.toStringHelper(this).add("id", chainId).toString();
}
- void clearTransaction(ReadWriteShardDataTreeTransaction transaction) {
+ void clearTransaction(final ReadWriteShardDataTreeTransaction transaction) {
if (transaction.equals(previousTx)) {
previousTx = null;
}
}
+
+ @Override
+ public LocalHistoryIdentifier getIdentifier() {
+ return chainId;
+ }
+
+ @Override
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) {
+ return dataTree.createReadyCohort(txId, modification);
+ }
}
*/
package org.opendaylight.controller.cluster.datastore;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
abstract class ShardDataTreeTransactionParent {
+
abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+
+ abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
}
public ShardReadTransaction(AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
ShardStats shardStats) {
- super(shardActor, shardStats, transaction.getId());
+ super(shardActor, shardStats, transaction.getIdentifier());
this.transaction = Preconditions.checkNotNull(transaction);
}
public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
ShardStats shardStats) {
- super(shardActor, shardStats, transaction.getId());
+ super(shardActor, shardStats, transaction.getIdentifier());
this.transaction = transaction;
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
}
}
+
@Override
- public ListenableFuture<Void> abort() {
+ public void abort(final FutureCallback<Void> callback) {
dataTree.startAbort(this);
state = State.ABORTED;
final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
if (!maybeAborts.isPresent()) {
- return VOID_FUTURE;
+ callback.onSuccess(null);
+ return;
}
final Future<Iterable<Object>> aborts = maybeAborts.get();
if (aborts.isCompleted()) {
- return VOID_FUTURE;
+ callback.onSuccess(null);
+ return;
}
- final SettableFuture<Void> ret = SettableFuture.create();
aborts.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(final Throwable failure, final Iterable<Object> objs) {
if (failure != null) {
- ret.setException(failure);
+ callback.onFailure(failure);
} else {
- ret.set(null);
+ callback.onSuccess(null);
}
}
}, ExecutionContexts.global());
-
- return ret;
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+/**
+ * Standalone transaction specialization of {@link AbstractFrontendHistory}. There can be multiple open transactions
+ * and they are submitted in any order.
+ *
+ * @author Robert Varga
+ */
+final class StandaloneFrontendHistory extends AbstractFrontendHistory {
+ private final LocalHistoryIdentifier identifier;
+ private final ShardDataTree tree;
+
+ StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+ super(persistenceId);
+ this.identifier = new LocalHistoryIdentifier(clientId, 0);
+ this.tree = Preconditions.checkNotNull(tree);
+ }
+
+ @Override
+ public LocalHistoryIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
+ return FrontendTransaction.createOpen(this, tree.newReadWriteTransaction(id));
+ }
+
+ @Override
+ FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
+ throws RequestException {
+ return FrontendTransaction.createReady(this, id, mod);
+ }
+
+ @Override
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
+ return tree.createReadyCohort(id, mod);
+ }
+}
import com.google.common.base.Optional;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
private FutureCallback<DataTreeCandidate> preCommit;
private FutureCallback<UnsignedLong> commit;
- public void setDelegate(ShardDataTreeCohort delegate) {
+ public void setDelegate(final ShardDataTreeCohort delegate) {
this.delegate = delegate;
}
}
@Override
- public void canCommit(FutureCallback<Void> callback) {
+ public void canCommit(final FutureCallback<Void> callback) {
canCommit = mockFutureCallback(callback);
delegate.canCommit(canCommit);
}
@Override
- public void preCommit(FutureCallback<DataTreeCandidate> callback) {
+ public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
preCommit = mockFutureCallback(callback);
delegate.preCommit(preCommit);
}
@Override
- public void commit(FutureCallback<UnsignedLong> callback) {
+ public void commit(final FutureCallback<UnsignedLong> callback) {
commit = mockFutureCallback(callback);
delegate.commit(commit);
}
}
@Override
- public ListenableFuture<Void> abort() {
- return delegate.abort();
+ public void abort(final FutureCallback<Void> callback) {
+ delegate.abort(callback);
}
@Override
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
verify(mockUserCohorts).abort();
}
+ private static Future<?> abort(final ShardDataTreeCohort cohort) {
+ final CompletableFuture<Void> f = new CompletableFuture<>();
+ cohort.abort(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ f.complete(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ f.completeExceptionally(failure);
+ }
+ });
+
+ return f;
+ }
+
@Test
public void testAbort() throws Exception {
doNothing().when(mockShardDataTree).startAbort(cohort);
- cohort.abort().get();
-
+ abort(cohort).get();
verify(mockShardDataTree).startAbort(cohort);
}
final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
- final ListenableFuture<Void> abortFuture = cohort.abort();
+ final Future<?> abortFuture = abort(cohort);
cohortFuture.success(Collections.emptyList());