extends TransactionRequest<T> {
private static final long serialVersionUID = 1L;
private final YangInstanceIdentifier path;
+ private final boolean snapshotOnly;
AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo,
- final YangInstanceIdentifier path) {
+ final YangInstanceIdentifier path, final boolean snapshotOnly) {
super(identifier, sequence, replyTo);
this.path = Preconditions.checkNotNull(path);
+ this.snapshotOnly = snapshotOnly;
}
AbstractReadTransactionRequest(final T request, final ABIVersion version) {
super(request, version);
this.path = request.getPath();
+ this.snapshotOnly = request.isSnapshotOnly();
}
@Nonnull
return path;
}
+ public final boolean isSnapshotOnly() {
+ return snapshotOnly;
+ }
+
@Override
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
return super.addToStringAttributes(toStringHelper).add("path", path);
extends AbstractTransactionRequestProxy<T> {
private static final long serialVersionUID = 1L;
private YangInstanceIdentifier path;
+ private boolean snapshotOnly;
protected AbstractReadTransactionRequestProxyV1() {
// For Externalizable
AbstractReadTransactionRequestProxyV1(final T request) {
super(request);
path = request.getPath();
+ snapshotOnly = request.isSnapshotOnly();
}
@Override
try (NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out)) {
nnout.writeYangInstanceIdentifier(path);
}
+ out.writeBoolean(snapshotOnly);
}
@Override
public final void readExternal(final ObjectInput in) throws ClassNotFoundException, IOException {
super.readExternal(in);
path = NormalizedNodeInputOutput.newDataInput(in).readYangInstanceIdentifier();
+ snapshotOnly = in.readBoolean();
}
@Override
protected final T createRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
- return createReadRequest(target, sequence, replyTo, path);
+ return createReadRequest(target, sequence, replyTo, path, snapshotOnly);
}
abstract T createReadRequest(TransactionIdentifier target, long sequence, ActorRef replyTo,
- YangInstanceIdentifier requestPath);
+ YangInstanceIdentifier requestPath, boolean snapshotOnly);
}
private static final long serialVersionUID = 1L;
public ExistsTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
- @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path) {
- super(identifier, sequence, replyTo, path);
+ @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path, final boolean snapshotOnly) {
+ super(identifier, sequence, replyTo, path, snapshotOnly);
}
private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
@Override
ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
- final ActorRef replyTo, final YangInstanceIdentifier path) {
- return new ExistsTransactionRequest(target, sequence, replyTo, path);
+ final ActorRef replyTo, final YangInstanceIdentifier path, final boolean snapshotOnly) {
+ return new ExistsTransactionRequest(target, sequence, replyTo, path, snapshotOnly);
}
}
private static final long serialVersionUID = 1L;
public ReadTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
- @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path) {
- super(identifier, sequence, replyTo, path);
+ @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path, final boolean snapshotOnly) {
+ super(identifier, sequence, replyTo, path, snapshotOnly);
}
private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
@Override
ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
- final ActorRef replyTo, final YangInstanceIdentifier path) {
- return new ReadTransactionRequest(target, sequence, replyTo, path);
+ final ActorRef replyTo, final YangInstanceIdentifier path, final boolean snapshotOnly) {
+ return new ReadTransactionRequest(target, sequence, replyTo, path, snapshotOnly);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract superclass of both ClientSnapshot and ClientTransaction. Provided for convenience.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> extends LocalAbortable
+ implements Identifiable<TransactionIdentifier> {
+ /*
+ * Our state consist of the the proxy map, hence we just subclass ConcurrentHashMap directly.
+ */
+ private static final class State<T> extends ConcurrentHashMap<Long, T> {
+ private static final long serialVersionUID = 1L;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHandle.class);
+ @SuppressWarnings("rawtypes")
+ private static final AtomicReferenceFieldUpdater<AbstractClientHandle, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractClientHandle.class, State.class, "state");
+
+ private final TransactionIdentifier transactionId;
+ private final AbstractClientHistory parent;
+
+ private volatile State<T> state = new State<>();
+
+ // Hidden to prevent outside instantiation
+ AbstractClientHandle(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
+ this.transactionId = Preconditions.checkNotNull(transactionId);
+ this.parent = Preconditions.checkNotNull(parent);
+ }
+
+ @Override
+ public final TransactionIdentifier getIdentifier() {
+ return transactionId;
+ }
+
+ /**
+ * Release all state associated with this transaction.
+ *
+ * @return True if this transaction became closed during this call
+ */
+ public final boolean abort() {
+ if (commonAbort()) {
+ parent.onTransactionAbort(this);
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean commonAbort() {
+ final Collection<T> toClose = ensureClosed();
+ if (toClose == null) {
+ return false;
+ }
+
+ toClose.forEach(AbstractProxyTransaction::abort);
+ return true;
+ }
+
+ @Override
+ final void localAbort(final Throwable cause) {
+ LOG.debug("Local abort of transaction {}", getIdentifier(), cause);
+ commonAbort();
+ }
+
+ /**
+ * Make sure this snapshot is closed. If it became closed as the effect of this call, return a collection of
+ * {@link AbstractProxyTransaction} handles which need to be closed, too.
+ *
+ * @return null if this snapshot has already been closed, otherwise a collection of proxies, which need to be
+ * closed, too.
+ */
+ @Nullable final Collection<T> ensureClosed() {
+ @SuppressWarnings("unchecked")
+ final State<T> local = STATE_UPDATER.getAndSet(this, null);
+ return local == null ? null : local.values();
+ }
+
+ final T ensureProxy(final YangInstanceIdentifier path, final Function<Long, T> createProxy) {
+ final Map<Long, T> local = getState();
+ final Long shard = parent.resolveShardForPath(path);
+
+ return local.computeIfAbsent(shard, createProxy);
+ }
+
+ final AbstractClientHistory parent() {
+ return parent;
+ }
+
+ private State<T> getState() {
+ final State<T> local = state;
+ Preconditions.checkState(local != null, "Transaction %s is closed", transactionId);
+ return local;
+ }
+}
AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
@GuardedBy("this")
- private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+ private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
@GuardedBy("this")
private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
LOG.debug("Force-closing history {}", getIdentifier(), cause);
synchronized (this) {
- for (ClientTransaction t : openTransactions.values()) {
+ for (AbstractClientHandle<?> t : openTransactions.values()) {
t.localAbort(cause);
}
openTransactions.clear();
LOG.debug("Create history response {}", response);
}
- final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+ private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
while (true) {
- final ProxyHistory history;
try {
- history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+ return histories.computeIfAbsent(shard, this::createHistoryProxy);
} catch (InversibleLockException e) {
LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
e.awaitResolution();
LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
- continue;
}
+ }
+ }
+
+ final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
+ return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
+ }
+
+ final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+ return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
+ }
- return history.createTransactionProxy(transactionId);
+ private void checkNotClosed() {
+ if (state == State.CLOSED) {
+ throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
}
}
/**
- * Allocate a {@link ClientTransaction}.
+ * Allocate a new {@link ClientTransaction}.
*
* @return A new {@link ClientTransaction}
* @throws TransactionChainClosedException if this history is closed
+ * @throws IllegalStateException if a previous dependent transaction has not been closed
*/
public final ClientTransaction createTransaction() {
- if (state == State.CLOSED) {
- throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
- }
+ checkNotClosed();
synchronized (this) {
final ClientTransaction ret = doCreateTransaction();
}
}
+ /**
+ * Create a new {@link ClientSnapshot}.
+ *
+ * @return A new {@link ClientSnapshot}
+ * @throws TransactionChainClosedException if this history is closed
+ * @throws IllegalStateException if a previous dependent transaction has not been closed
+ */
+ public final ClientSnapshot takeSnapshot() {
+ checkNotClosed();
+
+ synchronized (this) {
+ final ClientSnapshot ret = doCreateSnapshot();
+ openTransactions.put(ret.getIdentifier(), ret);
+ return ret;
+ }
+ }
+
+ @GuardedBy("this")
+ abstract ClientSnapshot doCreateSnapshot();
+
@GuardedBy("this")
abstract ClientTransaction doCreateTransaction();
* @param txId Transaction identifier
* @param cohort Transaction commit cohort
*/
- synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+ synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
final AbstractTransactionCommitCohort cohort) {
- final ClientTransaction tx = openTransactions.remove(txId);
- Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+ final TransactionIdentifier txId = tx.getIdentifier();
+ if (openTransactions.remove(txId) == null) {
+ LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
+ }
final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
* Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
* backend.
*
- * @param txId transaction identifier
+ * @param snapshot transaction identifier
*/
- synchronized void onTransactionAbort(final TransactionIdentifier txId) {
- if (openTransactions.remove(txId) == null) {
- LOG.warn("Could not find aborting transaction {}", txId);
+ synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
+ if (openTransactions.remove(snapshot.getIdentifier()) == null) {
+ LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
}
}
return singleHistory.createTransaction();
}
+ @Override
+ public final ClientSnapshot createSnapshot() {
+ return singleHistory.doCreateSnapshot();
+ }
+
@Override
public final void close() {
context().executeInActor(this::shutdown);
}
final void delete(final YangInstanceIdentifier path) {
+ checkReadWrite();
checkNotSealed();
doDelete(path);
}
final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkReadWrite();
checkNotSealed();
doMerge(path, data);
}
final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkReadWrite();
checkNotSealed();
doWrite(path, data);
}
return (SuccessorState) local;
}
+ private void checkReadWrite() {
+ if (isSnapshotOnly()) {
+ throw new UnsupportedOperationException("Transaction " + getIdentifier() + " is a read-only snapshot");
+ }
+ }
+
final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
successfulRequests.add(Verify.verifyNotNull(req));
}
* @return Future completion
*/
final ListenableFuture<Boolean> directCommit() {
+ checkReadWrite();
checkSealed();
// Precludes startReconnect() from interfering with the fast path
}
final void canCommit(final VotingFuture<?> ret) {
+ checkReadWrite();
checkSealed();
// Precludes startReconnect() from interfering with the fast path
}
final void preCommit(final VotingFuture<?> ret) {
+ checkReadWrite();
checkSealed();
final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
}
final void doCommit(final VotingFuture<?> ret) {
+ checkReadWrite();
checkSealed();
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
}
}
+ abstract boolean isSnapshotOnly();
+
abstract void doDelete(final YangInstanceIdentifier path);
abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
}
}
- @Override
- ClientTransaction doCreateTransaction() {
+ private State ensureIdleState() {
final State local = state();
Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
- updateState(local, State.TX_OPEN);
+ return local;
+ }
+
+ @Override
+ ClientSnapshot doCreateSnapshot() {
+ ensureIdleState();
+ return new ClientSnapshot(this, new TransactionIdentifier(getIdentifier(), nextTx()));
+ }
+ @Override
+ ClientTransaction doCreateTransaction() {
+ updateState(ensureIdleState(), State.TX_OPEN);
return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
}
@Override
- void onTransactionAbort(final TransactionIdentifier txId) {
- final State local = state();
- if (local == State.TX_OPEN) {
- updateState(local, State.IDLE);
+ void onTransactionAbort(final AbstractClientHandle<?> snap) {
+ if (snap instanceof ClientTransaction) {
+ final State local = state();
+ if (local == State.TX_OPEN) {
+ updateState(local, State.IDLE);
+ }
}
- super.onTransactionAbort(txId);
+ super.onTransactionAbort(snap);
}
@Override
- AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+ AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
final AbstractTransactionCommitCohort cohort) {
+
final State local = state();
switch (local) {
case CLOSED:
- return super.onTransactionReady(txId, cohort);
+ return super.onTransactionReady(tx, cohort);
case IDLE:
throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s",
- this, txId));
+ this, tx.getIdentifier()));
case TX_OPEN:
updateState(local, State.IDLE);
- return super.onTransactionReady(txId, cohort);
+ return super.onTransactionReady(tx, cohort);
default:
throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local));
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Snapshot of the datastore state. Note this snapshot is not consistent across shards because sub-shard snapshots are
+ * created lazily.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public class ClientSnapshot extends AbstractClientHandle<AbstractProxyTransaction> {
+ // Hidden to prevent outside instantiation
+ ClientSnapshot(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
+ super(parent, transactionId);
+ }
+
+ private AbstractProxyTransaction createProxy(final Long shard) {
+ return parent().createSnapshotProxy(getIdentifier(), shard);
+ }
+
+ private AbstractProxyTransaction ensureSnapshotProxy(final YangInstanceIdentifier path) {
+ return ensureProxy(path, this::createProxy);
+ }
+
+ public final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+ return ensureSnapshotProxy(path).exists(path);
+ }
+
+ public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final YangInstanceIdentifier path) {
+ return ensureSnapshotProxy(path).read(path);
+ }
+}
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.CheckedFuture;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.Collection;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Client-side view of a free-standing transaction.
+ * Client-side view of a transaction.
*
* <p>
* This interface is used by the world outside of the actor system and in the actor system it is manifested via
* @author Robert Varga
*/
@Beta
-public final class ClientTransaction extends LocalAbortable implements Identifiable<TransactionIdentifier> {
- private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class);
- private static final AtomicIntegerFieldUpdater<ClientTransaction> STATE_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state");
- private static final int OPEN_STATE = 0;
- private static final int CLOSED_STATE = 1;
-
- private final Map<Long, AbstractProxyTransaction> proxies = new ConcurrentHashMap<>();
- private final TransactionIdentifier transactionId;
- private final AbstractClientHistory parent;
-
- private volatile int state = OPEN_STATE;
+public final class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
- this.transactionId = Preconditions.checkNotNull(transactionId);
- this.parent = Preconditions.checkNotNull(parent);
+ super(parent, transactionId);
}
- private void checkNotClosed() {
- Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId);
- }
private AbstractProxyTransaction createProxy(final Long shard) {
- return parent.createTransactionProxy(transactionId, shard);
- }
-
- private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) {
- checkNotClosed();
-
- final Long shard = parent.resolveShardForPath(path);
- return proxies.computeIfAbsent(shard, this::createProxy);
+ return parent().createTransactionProxy(getIdentifier(), shard);
}
- @Override
- public TransactionIdentifier getIdentifier() {
- return transactionId;
+ private AbstractProxyTransaction ensureTransactionProxy(final YangInstanceIdentifier path) {
+ return ensureProxy(path, this::createProxy);
}
public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
- return ensureProxy(path).exists(path);
+ return ensureTransactionProxy(path).exists(path);
}
- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
- return ensureProxy(path).read(path);
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final YangInstanceIdentifier path) {
+ return ensureTransactionProxy(path).read(path);
}
public void delete(final YangInstanceIdentifier path) {
- ensureProxy(path).delete(path);
+ ensureTransactionProxy(path).delete(path);
}
public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- ensureProxy(path).merge(path, data);
+ ensureTransactionProxy(path).merge(path, data);
}
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- ensureProxy(path).write(path, data);
- }
-
- private boolean ensureClosed() {
- final int local = state;
- if (local == CLOSED_STATE) {
- return false;
- }
-
- final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE);
- Preconditions.checkState(success, "Transaction %s raced during close", this);
- return true;
+ ensureTransactionProxy(path).write(path, data);
}
public DOMStoreThreePhaseCommitCohort ready() {
- Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this);
-
- for (AbstractProxyTransaction p : proxies.values()) {
- p.seal();
- }
+ final Collection<AbstractProxyTransaction> toReady = ensureClosed();
+ Preconditions.checkState(toReady != null, "Attempted to submit a closed transaction %s", this);
+ toReady.forEach(AbstractProxyTransaction::seal);
final AbstractTransactionCommitCohort cohort;
- switch (proxies.size()) {
+ switch (toReady.size()) {
case 0:
- cohort = new EmptyTransactionCommitCohort(parent, transactionId);
+ cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier());
break;
case 1:
- cohort = new DirectTransactionCommitCohort(parent, transactionId,
- Iterables.getOnlyElement(proxies.values()));
+ cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(),
+ Iterables.getOnlyElement(toReady));
break;
default:
- cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values());
+ cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady);
break;
}
- return parent.onTransactionReady(transactionId, cohort);
- }
-
- /**
- * Release all state associated with this transaction.
- */
- public void abort() {
- if (commonAbort()) {
- parent.onTransactionAbort(transactionId);
- }
- }
-
- private boolean commonAbort() {
- if (!ensureClosed()) {
- return false;
- }
-
- for (AbstractProxyTransaction proxy : proxies.values()) {
- proxy.abort();
- }
- proxies.clear();
- return true;
- }
-
- @Override
- void localAbort(final Throwable cause) {
- LOG.debug("Local abort of transaction {}", getIdentifier(), cause);
- commonAbort();
- }
-
- Map<Long, AbstractProxyTransaction> getProxies() {
- return proxies;
+ return parent().onTransactionReady(this, cohort);
}
}
*/
@Nonnull ClientLocalHistory createLocalHistory();
+ /**
+ * Create a new free-standing snapshot.
+ *
+ * @return Client snapshot handle
+ */
+ @Nonnull ClientSnapshot createSnapshot();
+
/**
* Create a new free-standing transaction.
*
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
-import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
-import org.opendaylight.controller.cluster.access.commands.TransactionModification;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @author Robert Varga
*/
@NotThreadSafe
-final class LocalProxyTransaction extends AbstractProxyTransaction {
+abstract class LocalProxyTransaction extends AbstractProxyTransaction {
private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
private final TransactionIdentifier identifier;
- private CursorAwareDataTreeModification modification;
- private CursorAwareDataTreeModification sealedModification;
-
- LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
- final CursorAwareDataTreeModification modification) {
+ LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
super(parent);
this.identifier = Preconditions.checkNotNull(identifier);
- this.modification = Preconditions.checkNotNull(modification);
}
@Override
- public TransactionIdentifier getIdentifier() {
+ public final TransactionIdentifier getIdentifier() {
return identifier;
}
- @Override
- void doDelete(final YangInstanceIdentifier path) {
- modification.delete(path);
- }
+ abstract DataTreeSnapshot readOnlyView();
- @Override
- void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- modification.merge(path, data);
- }
+ abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+ @Nullable Consumer<Response<?, ?>> callback);
@Override
- void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- modification.write(path, data);
+ final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
+ return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
}
@Override
- CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
- return Futures.immediateCheckedFuture(modification.readNode(path).isPresent());
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
+ return Futures.immediateCheckedFuture(readOnlyView().readNode(path));
}
@Override
- CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
- return Futures.immediateCheckedFuture(modification.readNode(path));
- }
-
- private RuntimeException abortedException() {
- return new IllegalStateException("Tracker " + identifier + " has been aborted");
- }
-
- private RuntimeException submittedException() {
- return new IllegalStateException("Tracker " + identifier + " has been submitted");
- }
-
- @Override
- void doAbort() {
+ final void doAbort() {
sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
LOG.debug("Transaction {} abort completed with {}", identifier, response);
});
}
- @Override
- CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
- final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(),
- localActor(), modification, coordinated);
- modification = new FailedDataTreeModification(this::submittedException);
- return ret;
- }
-
- @Override
- void doSeal() {
- modification.ready();
- sealedModification = modification;
- }
-
- @Override
- void flushState(final AbstractProxyTransaction successor) {
- sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
- @Override
- public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.write(current().node(child), data);
- }
-
- @Override
- public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.merge(current().node(child), data);
- }
-
- @Override
- public void delete(final PathArgument child) {
- successor.delete(current().node(child));
- }
- });
- }
-
- DataTreeSnapshot getSnapshot() {
- Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier);
- return sealedModification;
- }
-
- private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
- final @Nullable Consumer<Response<?, ?>> callback) {
- for (TransactionModification mod : request.getModifications()) {
- if (mod instanceof TransactionWrite) {
- modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
- } else if (mod instanceof TransactionMerge) {
- modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
- } else if (mod instanceof TransactionDelete) {
- modification.delete(mod.getPath());
- } else {
- throw new IllegalArgumentException("Unsupported modification " + mod);
- }
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
- if (maybeProtocol.isPresent()) {
- seal();
- Verify.verify(callback != null, "Request {} has null callback", request);
-
- switch (maybeProtocol.get()) {
- case ABORT:
- sendAbort(callback);
- break;
- case SIMPLE:
- sendRequest(commitRequest(false), callback);
- break;
- case THREE_PHASE:
- sendRequest(commitRequest(true), callback);
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
- }
- }
- }
-
@Override
void handleForwardedRemoteRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
- LOG.debug("Applying forwarded request {}", request);
-
if (request instanceof ModifyTransactionRequest) {
applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
} else if (request instanceof ReadTransactionRequest) {
final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
- final Optional<NormalizedNode<?, ?>> result = modification.readNode(path);
+ final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
} else if (request instanceof ExistsTransactionRequest) {
final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
- final boolean result = modification.readNode(path).isPresent();
+ final boolean result = readOnlyView().readNode(path).isPresent();
callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
- } else if (request instanceof TransactionPreCommitRequest) {
- sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
- } else if (request instanceof TransactionDoCommitRequest) {
- sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
- } else if (request instanceof TransactionAbortRequest) {
- sendAbort(callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
final Consumer<Response<?, ?>> callback) {
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
- } else if (request instanceof CommitLocalTransactionRequest) {
- successor.sendCommit((CommitLocalTransactionRequest)request, callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
LOG.debug("Forwarded request {} to successor {}", request, successor);
}
- private void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
sendRequest(request, callback);
- modification = new FailedDataTreeModification(this::abortedException);
- }
-
- private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
- // Rebase old modification on new data tree.
- try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
- request.getModification().applyToCursor(cursor);
- }
-
- seal();
- sendRequest(commitRequest(request.isCoordinated()), callback);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A read-only specialization of {@link LocalProxyTransaction}.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalReadOnlyProxyTransaction.class);
+
+ private final DataTreeSnapshot snapshot;
+
+ LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+ final DataTreeSnapshot snapshot) {
+ super(parent, identifier);
+ this.snapshot = Preconditions.checkNotNull(snapshot);
+ }
+
+ @Override
+ boolean isSnapshotOnly() {
+ return true;
+ }
+
+ @Override
+ DataTreeSnapshot readOnlyView() {
+ return snapshot;
+ }
+
+ @Override
+ void doDelete(final YangInstanceIdentifier path) {
+ throw new UnsupportedOperationException("Read-only snapshot");
+ }
+
+ @Override
+ void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ throw new UnsupportedOperationException("Read-only snapshot");
+ }
+
+ @Override
+ void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ throw new UnsupportedOperationException("Read-only snapshot");
+ }
+
+ @Override
+ CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
+ throw new UnsupportedOperationException("Read-only snapshot");
+ }
+
+ @Override
+ void doSeal() {
+ // No-op
+ }
+
+ @Override
+ void flushState(final AbstractProxyTransaction successor) {
+ // No-op
+ }
+
+ @Override
+ void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ final Consumer<Response<?, ?>> callback) {
+ Verify.verify(request.getModifications().isEmpty());
+
+ final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
+ Verify.verify(protocol == PersistenceProtocol.ABORT);
+ abort();
+ }
+
+ @Override
+ void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
+ if (request instanceof CommitLocalTransactionRequest) {
+ final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+ final DataTreeModification mod = req.getModification();
+
+ LOG.debug("Applying modification {} to successor {}", mod, successor);
+ mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.write(current().node(child), data);
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.merge(current().node(child), data);
+ }
+
+ @Override
+ public void delete(final PathArgument child) {
+ successor.delete(current().node(child));
+ }
+ });
+
+ successor.seal();
+
+ final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+ successor.sendRequest(successorReq, callback);
+ } else if (request instanceof AbortLocalTransactionRequest) {
+ LOG.debug("Forwarding abort {} to successor {}", request, successor);
+ successor.abort();
+ } else {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
+ * the client instance.
+ *
+ * <p>
+ * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
+ * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
+ * leader.
+ *
+ * <p>
+ * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
+ * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class);
+
+ private CursorAwareDataTreeModification modification;
+ private CursorAwareDataTreeModification sealedModification;
+
+ LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+ final DataTreeSnapshot snapshot) {
+ super(parent, identifier);
+ this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
+ }
+
+ @Override
+ boolean isSnapshotOnly() {
+ return false;
+ }
+
+ @Override
+ CursorAwareDataTreeSnapshot readOnlyView() {
+ return modification;
+ }
+
+ @Override
+ void doDelete(final YangInstanceIdentifier path) {
+ modification.delete(path);
+ }
+
+ @Override
+ void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ modification.merge(path, data);
+ }
+
+ @Override
+ void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ modification.write(path, data);
+ }
+
+ private RuntimeException abortedException() {
+ return new IllegalStateException("Tracker " + getIdentifier() + " has been aborted");
+ }
+
+ private RuntimeException submittedException() {
+ return new IllegalStateException("Tracker " + getIdentifier() + " has been submitted");
+ }
+
+ @Override
+ CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
+ final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
+ localActor(), modification, coordinated);
+ modification = new FailedDataTreeModification(this::submittedException);
+ return ret;
+ }
+
+ @Override
+ void doSeal() {
+ modification.ready();
+ sealedModification = modification;
+ }
+
+ @Override
+ void flushState(final AbstractProxyTransaction successor) {
+ sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.write(current().node(child), data);
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.merge(current().node(child), data);
+ }
+
+ @Override
+ public void delete(final PathArgument child) {
+ successor.delete(current().node(child));
+ }
+ });
+ }
+
+ DataTreeSnapshot getSnapshot() {
+ Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier());
+ return sealedModification;
+ }
+
+ @Override
+ void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ final @Nullable Consumer<Response<?, ?>> callback) {
+ for (TransactionModification mod : request.getModifications()) {
+ if (mod instanceof TransactionWrite) {
+ modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
+ } else if (mod instanceof TransactionMerge) {
+ modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
+ } else if (mod instanceof TransactionDelete) {
+ modification.delete(mod.getPath());
+ } else {
+ throw new IllegalArgumentException("Unsupported modification " + mod);
+ }
+ }
+
+ final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+ if (maybeProtocol.isPresent()) {
+ seal();
+ Verify.verify(callback != null, "Request {} has null callback", request);
+
+ switch (maybeProtocol.get()) {
+ case ABORT:
+ sendAbort(callback);
+ break;
+ case SIMPLE:
+ sendRequest(commitRequest(false), callback);
+ break;
+ case THREE_PHASE:
+ sendRequest(commitRequest(true), callback);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
+ }
+ }
+ }
+
+ @Override
+ void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback) {
+ LOG.debug("Applying forwarded request {}", request);
+
+ if (request instanceof TransactionPreCommitRequest) {
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionAbortRequest) {
+ sendAbort(callback);
+ } else {
+ super.handleForwardedRemoteRequest(request, callback);
+ }
+ }
+
+ @Override
+ void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
+ if (request instanceof CommitLocalTransactionRequest) {
+ final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+ final DataTreeModification mod = req.getModification();
+
+ LOG.debug("Applying modification {} to successor {}", mod, successor);
+ mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.write(current().node(child), data);
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.merge(current().node(child), data);
+ }
+
+ @Override
+ public void delete(final PathArgument child) {
+ successor.delete(current().node(child));
+ }
+ });
+
+ successor.seal();
+
+ final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+ successor.sendRequest(successorReq, callback);
+ } else if (request instanceof AbortLocalTransactionRequest) {
+ LOG.debug("Forwarding abort {} to successor {}", request, successor);
+ successor.abort();
+ } else {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+ }
+
+ @Override
+ void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
+ if (request instanceof CommitLocalTransactionRequest) {
+ Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
+ ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback);
+ LOG.debug("Forwarded request {} to successor {}", request, successor);
+ } else {
+ super.forwardToLocal(successor, request, callback);
+ }
+ }
+
+ @Override
+ void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ super.sendAbort(request, callback);
+ modification = new FailedDataTreeModification(this::abortedException);
+ }
+
+ private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+ // Rebase old modification on new data tree.
+ try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
+ request.getModification().applyToCursor(cursor);
+ }
+
+ seal();
+ sendRequest(commitRequest(request.isCoordinated()), callback);
+ }
+}
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
@Override
final AbstractProxyTransaction doCreateTransactionProxy(
- final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
- return new RemoteProxyTransaction(this, txId);
+ final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
+ final boolean snapshotOnly) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly);
}
}
private static final class Local extends AbstractLocal {
- private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
+ private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
// Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
// the open one and attempts to create a new transaction again.
- private LocalProxyTransaction lastOpen;
+ private LocalReadWriteProxyTransaction lastOpen;
- private volatile LocalProxyTransaction lastSealed;
+ private volatile LocalReadWriteProxyTransaction lastSealed;
Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
final DataTree dataTree) {
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId) {
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
// onTransactionCompleted() runs concurrently
- final LocalProxyTransaction localSealed = lastSealed;
+ final LocalReadWriteProxyTransaction localSealed = lastSealed;
final DataTreeSnapshot baseSnapshot;
if (localSealed != null) {
baseSnapshot = localSealed.getSnapshot();
baseSnapshot = takeSnapshot();
}
- lastOpen = new LocalProxyTransaction(this, txId,
- (CursorAwareDataTreeModification) baseSnapshot.newModification());
+ if (snapshotOnly) {
+ return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
+ }
+
+ lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
LOG.debug("Proxy {} open transaction {}", this, lastOpen);
return lastOpen;
}
@Override
void onTransactionAborted(final AbstractProxyTransaction tx) {
- Preconditions.checkState(tx.equals(lastOpen));
- lastOpen = null;
+ if (tx.equals(lastOpen)) {
+ lastOpen = null;
+ }
}
@Override
void onTransactionCompleted(final AbstractProxyTransaction tx) {
Verify.verify(tx instanceof LocalProxyTransaction);
-
- if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
- LOG.debug("Completed last sealed transaction {}", tx);
+ if (tx instanceof LocalReadWriteProxyTransaction) {
+ if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+ LOG.debug("Completed last sealed transaction {}", tx);
+ }
}
}
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId) {
- return new LocalProxyTransaction(this, txId,
- (CursorAwareDataTreeModification) takeSnapshot().newModification());
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final DataTreeSnapshot snapshot = takeSnapshot();
+ return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
+ new LocalReadWriteProxyTransaction(this, txId, snapshot);
}
@Override
for (AbstractProxyTransaction t : proxies.values()) {
LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
- final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
+ final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
+ t.isSnapshotOnly());
LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
t.replayMessages(newProxy, previousEntries);
}
return connection.localActor();
}
- final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+ final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+ final boolean snapshotOnly) {
lock.lock();
try {
if (successor != null) {
- return successor.createTransactionProxy(txId);
+ return successor.createTransactionProxy(txId, snapshotOnly);
}
final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
- final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
+ final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
return ret;
@GuardedBy("lock")
abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
- TransactionIdentifier txId);
+ TransactionIdentifier txId, boolean snapshotOnly);
abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
private static final int REQUEST_MAX_MODIFICATIONS = 1000;
private final ModifyTransactionRequestBuilder builder;
+ private final boolean snapshotOnly;
private boolean builderBusy;
private volatile Exception operationFailure;
- RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+ RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+ final boolean snapshotOnly) {
super(parent);
+ this.snapshotOnly = snapshotOnly;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
}
+ @Override
+ boolean isSnapshotOnly() {
+ return snapshotOnly;
+ }
+
@Override
public TransactionIdentifier getIdentifier() {
return builder.getIdentifier();
@Override
CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
final SettableFuture<Boolean> future = SettableFuture.create();
- return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
- t -> completeExists(future, t), future);
+ return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
+ isSnapshotOnly()), t -> completeExists(future, t), future);
}
@Override
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
- return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
- t -> completeRead(future, t), future);
+ return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
+ isSnapshotOnly()), t -> completeRead(future, t), future);
}
@Override
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ReadTransactionRequest) request).getPath()), callback);
+ ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ExistsTransactionRequest) request).getPath()), callback);
+ ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
} else if (request instanceof TransactionPreCommitRequest) {
ensureFlushedBuider();
sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
super(client, identifier);
}
+ @Override
+ ClientSnapshot doCreateSnapshot() {
+ final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx());
+ LOG.debug("{}: creating a new snapshot {}", this, txId);
+
+ return new ClientSnapshot(this, txId);
+ }
+
@Override
ClientTransaction doCreateTransaction() {
final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx());
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
throw UNSEQUENCED_START;
}
- if (request instanceof CommitLocalTransactionRequest) {
- tx = createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
- LOG.debug("{}: allocated new ready transaction {}", persistenceId(), id);
- } else {
- tx = createOpenTransaction(id);
- LOG.debug("{}: allocated new open transaction {}", persistenceId(), id);
- }
-
+ tx = createTransaction(request, id);
transactions.put(id, tx);
} else {
final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
return tx.handleRequest(request, envelope, now);
}
+ private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
+ throws RequestException {
+ if (request instanceof CommitLocalTransactionRequest) {
+ LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id);
+ return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
+ }
+ if (request instanceof AbstractReadTransactionRequest) {
+ if (((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
+ LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id);
+ return createOpenSnapshot(id);
+ }
+ }
+
+ LOG.debug("{}: allocating new open transaction {}", persistenceId(), id);
+ return createOpenTransaction(id);
+ }
+
+ abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id) throws RequestException;
+
abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Read-only frontend transaction state as observed by the shard leader.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class FrontendReadOnlyTransaction extends FrontendTransaction {
+ private final ReadOnlyShardDataTreeTransaction openTransaction;
+
+ private FrontendReadOnlyTransaction(final AbstractFrontendHistory history,
+ final ReadOnlyShardDataTreeTransaction transaction) {
+ super(history, transaction.getIdentifier());
+ this.openTransaction = Preconditions.checkNotNull(transaction);
+ }
+
+ static FrontendReadOnlyTransaction create(final AbstractFrontendHistory history,
+ final ReadOnlyShardDataTreeTransaction transaction) {
+ return new FrontendReadOnlyTransaction(history, transaction);
+ }
+
+ // Sequence has already been checked
+ @Override
+ @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
+ if (request instanceof ExistsTransactionRequest) {
+ return handleExistsTransaction((ExistsTransactionRequest) request);
+ } else if (request instanceof ReadTransactionRequest) {
+ return handleReadTransaction((ReadTransactionRequest) request);
+ } else if (request instanceof TransactionAbortRequest) {
+ return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ } else {
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
+ private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ openTransaction.abort();
+ return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence());
+ }
+
+ private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
+ throws RequestException {
+ final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(openTransaction.getIdentifier(),
+ request.getSequence(), data.isPresent()));
+ }
+
+ private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+ throws RequestException {
+ final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ return recordSuccess(request.getSequence(), new ReadTransactionSuccess(openTransaction.getIdentifier(),
+ request.getSequence(), data));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend read-write transaction state as observed by the shard leader.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class FrontendReadWriteTransaction extends FrontendTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
+
+ private ReadWriteShardDataTreeTransaction openTransaction;
+ private DataTreeModification sealedModification;
+ private ShardDataTreeCohort readyCohort;
+
+ private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+ final ReadWriteShardDataTreeTransaction transaction) {
+ super(history, id);
+ this.openTransaction = Preconditions.checkNotNull(transaction);
+ }
+
+ private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+ final DataTreeModification mod) {
+ super(history, id);
+ this.sealedModification = Preconditions.checkNotNull(mod);
+ }
+
+ static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
+ final ReadWriteShardDataTreeTransaction transaction) {
+ return new FrontendReadWriteTransaction(history, transaction.getIdentifier(), transaction);
+ }
+
+ static FrontendReadWriteTransaction createReady(final AbstractFrontendHistory history,
+ final TransactionIdentifier id, final DataTreeModification mod) {
+ return new FrontendReadWriteTransaction(history, id, mod);
+ }
+
+ // Sequence has already been checked
+ @Override
+ @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
+ if (request instanceof ModifyTransactionRequest) {
+ return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
+ } else if (request instanceof CommitLocalTransactionRequest) {
+ handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
+ return null;
+ } else if (request instanceof ExistsTransactionRequest) {
+ return handleExistsTransaction((ExistsTransactionRequest) request);
+ } else if (request instanceof ReadTransactionRequest) {
+ return handleReadTransaction((ReadTransactionRequest) request);
+ } else if (request instanceof TransactionPreCommitRequest) {
+ handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
+ return null;
+ } else if (request instanceof TransactionDoCommitRequest) {
+ handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
+ return null;
+ } else if (request instanceof TransactionAbortRequest) {
+ return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ } else {
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
+ private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+ @Override
+ public void onSuccess(final DataTreeCandidate result) {
+ recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+ request.getSequence()));
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
+ readyCohort.commit(new FutureCallback<UnsignedLong>() {
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ successfulCommit(envelope, now);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ if (readyCohort == null) {
+ openTransaction.abort();
+ return new TransactionAbortSuccess(getIdentifier(), request.getSequence());
+ }
+
+ readyCohort.abort(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ readyCohort = null;
+ recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+ request.getSequence()));
+ LOG.debug("Transaction {} aborted", getIdentifier());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ readyCohort = null;
+ LOG.warn("Transaction {} abort failed", getIdentifier(), failure);
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
+ }
+ });
+ return null;
+ }
+
+ private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
+ readyCohort.canCommit(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+ envelope.getMessage().getSequence()));
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void directCommit(final RequestEnvelope envelope, final long now) {
+ readyCohort.canCommit(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ successfulDirectCanCommit(envelope, now);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+
+ }
+
+ private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+ readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+ @Override
+ public void onSuccess(final DataTreeCandidate result) {
+ successfulDirectPreCommit(envelope, startTime);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+ readyCohort.commit(new FutureCallback<UnsignedLong>() {
+
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ successfulCommit(envelope, startTime);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
+ readyCohort = null;
+ }
+ });
+ }
+
+ private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+ recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+ envelope.getMessage().getSequence()));
+ readyCohort = null;
+ }
+
+ private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ if (sealedModification.equals(request.getModification())) {
+ readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+
+ if (request.isCoordinated()) {
+ coordinatedCommit(envelope, now);
+ } else {
+ directCommit(envelope, now);
+ }
+ } else {
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
+ private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
+ throws RequestException {
+ final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
+ data.isPresent()));
+ }
+
+ private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+ throws RequestException {
+ final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
+ data));
+ }
+
+ private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
+ return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
+ }
+
+ private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+
+ final DataTreeModification modification = openTransaction.getSnapshot();
+ for (TransactionModification m : request.getModifications()) {
+ if (m instanceof TransactionDelete) {
+ modification.delete(m.getPath());
+ } else if (m instanceof TransactionWrite) {
+ modification.write(m.getPath(), ((TransactionWrite) m).getData());
+ } else if (m instanceof TransactionMerge) {
+ modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+ } else {
+ LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+ }
+ }
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
+ if (!maybeProto.isPresent()) {
+ return replyModifySuccess(request.getSequence());
+ }
+
+ switch (maybeProto.get()) {
+ case ABORT:
+ openTransaction.abort();
+ openTransaction = null;
+ return replyModifySuccess(request.getSequence());
+ case SIMPLE:
+ readyCohort = openTransaction.ready();
+ openTransaction = null;
+ directCommit(envelope, now);
+ return null;
+ case THREE_PHASE:
+ readyCohort = openTransaction.ready();
+ openTransaction = null;
+ coordinatedCommit(envelope, now);
+ return null;
+ default:
+ throw new UnsupportedRequestException(request);
+ }
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
-import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
-import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
-import org.opendaylight.controller.cluster.access.commands.TransactionModification;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.concepts.Identifiable;
/**
- * Frontend transaction state as observed by the shard leader.
+ * Frontend common transaction state as observed by the shard leader.
*
* @author Robert Varga
*/
@NotThreadSafe
-final class FrontendTransaction {
- private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
-
+abstract class FrontendTransaction implements Identifiable<TransactionIdentifier> {
private final AbstractFrontendHistory history;
private final TransactionIdentifier id;
private Long lastPurgedSequence;
private long expectedSequence;
- private ReadWriteShardDataTreeTransaction openTransaction;
- private DataTreeModification sealedModification;
- private ShardDataTreeCohort readyCohort;
-
- private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
- final ReadWriteShardDataTreeTransaction transaction) {
+ FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) {
this.history = Preconditions.checkNotNull(history);
this.id = Preconditions.checkNotNull(id);
- this.openTransaction = Preconditions.checkNotNull(transaction);
}
- private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
- final DataTreeModification mod) {
- this.history = Preconditions.checkNotNull(history);
- this.id = Preconditions.checkNotNull(id);
- this.sealedModification = Preconditions.checkNotNull(mod);
+ @Override
+ public final TransactionIdentifier getIdentifier() {
+ return id;
}
- static FrontendTransaction createOpen(final AbstractFrontendHistory history,
- final ReadWriteShardDataTreeTransaction transaction) {
- return new FrontendTransaction(history, transaction.getIdentifier(), transaction);
+ final AbstractFrontendHistory history() {
+ return history;
}
- static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id,
- final DataTreeModification mod) {
- return new FrontendTransaction(history, id, mod);
- }
-
- java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
+ final java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
// Fast path check: if the requested sequence is the next request, bail early
if (expectedSequence == sequence) {
return java.util.Optional.empty();
return java.util.Optional.empty();
}
- void purgeSequencesUpTo(final long sequence) {
+ final void purgeSequencesUpTo(final long sequence) {
// FIXME: implement this
lastPurgedSequence = sequence;
}
// Sequence has already been checked
- @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
- final long now) throws RequestException {
- if (request instanceof ModifyTransactionRequest) {
- return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
- } else if (request instanceof CommitLocalTransactionRequest) {
- handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
- return null;
- } else if (request instanceof ExistsTransactionRequest) {
- return handleExistsTransaction((ExistsTransactionRequest) request);
- } else if (request instanceof ReadTransactionRequest) {
- return handleReadTransaction((ReadTransactionRequest) request);
- } else if (request instanceof TransactionPreCommitRequest) {
- handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
- return null;
- } else if (request instanceof TransactionDoCommitRequest) {
- handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
- return null;
- } else if (request instanceof TransactionAbortRequest) {
- return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
- } else {
- throw new UnsupportedRequestException(request);
- }
- }
+ abstract @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request,
+ final RequestEnvelope envelope, final long now) throws RequestException;
private void recordResponse(final long sequence, final Object response) {
if (replayQueue.isEmpty()) {
expectedSequence++;
}
- private <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
+ final <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
recordResponse(sequence, success);
return success;
}
return history.readTime() - startTime;
}
- private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+ final void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
final TransactionSuccess<?> success) {
recordResponse(success.getSequence(), success);
envelope.sendSuccess(success, executionTime(startTime));
}
- private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+ final void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
final RuntimeRequestException failure) {
recordResponse(envelope.getMessage().getSequence(), failure);
envelope.sendFailure(failure, executionTime(startTime));
}
-
- private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
- request.getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
- final long now) throws RequestException {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- if (readyCohort == null) {
- openTransaction.abort();
- return new TransactionAbortSuccess(id, request.getSequence());
- }
-
- readyCohort.abort(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- readyCohort = null;
- recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
- LOG.debug("Transaction {} aborted", id);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- readyCohort = null;
- LOG.warn("Transaction {} abort failed", id, failure);
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
- }
- });
- return null;
- }
-
- private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void directCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
-
- }
-
- private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- successfulDirectPreCommit(envelope, startTime);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope, startTime);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
- recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- readyCohort = null;
- }
-
- private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- if (sealedModification.equals(request.getModification())) {
- readyCohort = history.createReadyCohort(id, sealedModification);
-
- if (request.isCoordinated()) {
- coordinatedCommit(envelope, now);
- } else {
- directCommit(envelope, now);
- }
- } else {
- throw new UnsupportedRequestException(request);
- }
- }
-
- private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
- throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
- data.isPresent()));
- }
-
- private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
- throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
- }
-
- private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
- return recordSuccess(sequence, new ModifyTransactionSuccess(id, sequence));
- }
-
- private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
-
- final DataTreeModification modification = openTransaction.getSnapshot();
- for (TransactionModification m : request.getModifications()) {
- if (m instanceof TransactionDelete) {
- modification.delete(m.getPath());
- } else if (m instanceof TransactionWrite) {
- modification.write(m.getPath(), ((TransactionWrite) m).getData());
- } else if (m instanceof TransactionMerge) {
- modification.merge(m.getPath(), ((TransactionMerge) m).getData());
- } else {
- LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m);
- }
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
- if (!maybeProto.isPresent()) {
- return replyModifySuccess(request.getSequence());
- }
-
- switch (maybeProto.get()) {
- case ABORT:
- openTransaction.abort();
- openTransaction = null;
- return replyModifySuccess(request.getSequence());
- case SIMPLE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
- directCommit(envelope, now);
- return null;
- case THREE_PHASE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
- coordinatedCommit(envelope, now);
- return null;
- default:
- throw new UnsupportedRequestException(request);
- }
- }
}
return chain.getIdentifier();
}
+ @Override
+ FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException {
+ checkDeadTransaction(id);
+ lastSeenTransaction = id.getTransactionId();
+ return FrontendReadOnlyTransaction.create(this, chain.newReadOnlyTransaction(id));
+ }
+
@Override
FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
checkDeadTransaction(id);
lastSeenTransaction = id.getTransactionId();
- return FrontendTransaction.createOpen(this, chain.newReadWriteTransaction(id));
+ return FrontendReadWriteTransaction.createOpen(this, chain.newReadWriteTransaction(id));
}
@Override
throws RequestException {
checkDeadTransaction(id);
lastSeenTransaction = id.getTransactionId();
- return FrontendTransaction.createReady(this, id, mod);
+ return FrontendReadWriteTransaction.createReady(this, id, mod);
}
@Override
return identifier;
}
+ @Override
+ FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException {
+ return FrontendReadOnlyTransaction.create(this, tree.newReadOnlyTransaction(id));
+ }
+
@Override
FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
- return FrontendTransaction.createOpen(this, tree.newReadWriteTransaction(id));
+ return FrontendReadWriteTransaction.createOpen(this, tree.newReadWriteTransaction(id));
}
@Override
FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
throws RequestException {
- return FrontendTransaction.createReady(this, id, mod);
+ return FrontendReadWriteTransaction.createReady(this, id, mod);
}
@Override