BUG-5280: add basic concept of ClientSnapshot 27/48727/32
authorRobert Varga <rovarga@cisco.com>
Fri, 25 Nov 2016 12:55:13 +0000 (13:55 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 12 Dec 2016 19:27:55 +0000 (20:27 +0100)
In order to accurately read-only transactions with ClientLocalHistory,
we need to differentiate between Transactions and Snapshots. This patch
introduces the concept, its API and backend signalling/implementation.

State keeping is reworked so it requires only a single field, which
is manipulated via an atonic updater, with null signifying state has
already been closed (or is in process of being taken care of).

Change-Id: I2f8fd5ffdff366d1948538299b96721b756c620c
Signed-off-by: Robert Varga <rovarga@cisco.com>
26 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java

index 17054f7..6510d5b 100644 (file)
@@ -33,16 +33,19 @@ public abstract class AbstractReadTransactionRequest<T extends AbstractReadTrans
         extends TransactionRequest<T> {
     private static final long serialVersionUID = 1L;
     private final YangInstanceIdentifier path;
+    private final boolean snapshotOnly;
 
     AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo,
-        final YangInstanceIdentifier path) {
+        final YangInstanceIdentifier path, final boolean snapshotOnly) {
         super(identifier, sequence, replyTo);
         this.path = Preconditions.checkNotNull(path);
+        this.snapshotOnly = snapshotOnly;
     }
 
     AbstractReadTransactionRequest(final T request, final ABIVersion version) {
         super(request, version);
         this.path = request.getPath();
+        this.snapshotOnly = request.isSnapshotOnly();
     }
 
     @Nonnull
@@ -50,6 +53,10 @@ public abstract class AbstractReadTransactionRequest<T extends AbstractReadTrans
         return path;
     }
 
+    public final boolean isSnapshotOnly() {
+        return snapshotOnly;
+    }
+
     @Override
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
         return super.addToStringAttributes(toStringHelper).add("path", path);
index d3a60af..51c60e1 100644 (file)
@@ -28,6 +28,7 @@ abstract class AbstractReadTransactionRequestProxyV1<T extends AbstractReadTrans
         extends AbstractTransactionRequestProxy<T> {
     private static final long serialVersionUID = 1L;
     private YangInstanceIdentifier path;
+    private boolean snapshotOnly;
 
     protected AbstractReadTransactionRequestProxyV1() {
         // For Externalizable
@@ -36,6 +37,7 @@ abstract class AbstractReadTransactionRequestProxyV1<T extends AbstractReadTrans
     AbstractReadTransactionRequestProxyV1(final T request) {
         super(request);
         path = request.getPath();
+        snapshotOnly = request.isSnapshotOnly();
     }
 
     @Override
@@ -44,19 +46,21 @@ abstract class AbstractReadTransactionRequestProxyV1<T extends AbstractReadTrans
         try (NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out)) {
             nnout.writeYangInstanceIdentifier(path);
         }
+        out.writeBoolean(snapshotOnly);
     }
 
     @Override
     public final void readExternal(final ObjectInput in) throws ClassNotFoundException, IOException {
         super.readExternal(in);
         path = NormalizedNodeInputOutput.newDataInput(in).readYangInstanceIdentifier();
+        snapshotOnly = in.readBoolean();
     }
 
     @Override
     protected final T createRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
-        return createReadRequest(target, sequence, replyTo, path);
+        return createReadRequest(target, sequence, replyTo, path, snapshotOnly);
     }
 
     abstract T createReadRequest(TransactionIdentifier target, long sequence, ActorRef replyTo,
-            YangInstanceIdentifier requestPath);
+            YangInstanceIdentifier requestPath, boolean snapshotOnly);
 }
index b862117..1702f14 100644 (file)
@@ -24,8 +24,8 @@ public final class ExistsTransactionRequest extends AbstractReadTransactionReque
     private static final long serialVersionUID = 1L;
 
     public ExistsTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
-            @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path) {
-        super(identifier, sequence, replyTo, path);
+            @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path, final boolean snapshotOnly) {
+        super(identifier, sequence, replyTo, path, snapshotOnly);
     }
 
     private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
index e95da8d..6a6ede1 100644 (file)
@@ -33,7 +33,7 @@ final class ExistsTransactionRequestProxyV1 extends AbstractReadTransactionReque
 
     @Override
     ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
-            final ActorRef replyTo, final YangInstanceIdentifier path) {
-        return new ExistsTransactionRequest(target, sequence, replyTo, path);
+            final ActorRef replyTo, final YangInstanceIdentifier path, final boolean snapshotOnly) {
+        return new ExistsTransactionRequest(target, sequence, replyTo, path, snapshotOnly);
     }
 }
index 14bb6b4..a6bc701 100644 (file)
@@ -24,8 +24,8 @@ public final class ReadTransactionRequest extends AbstractReadTransactionRequest
     private static final long serialVersionUID = 1L;
 
     public ReadTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
-            @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path) {
-        super(identifier, sequence, replyTo, path);
+            @Nonnull final ActorRef replyTo, @Nonnull final YangInstanceIdentifier path, final boolean snapshotOnly) {
+        super(identifier, sequence, replyTo, path, snapshotOnly);
     }
 
     private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
index a438a8b..153343d 100644 (file)
@@ -33,7 +33,7 @@ final class ReadTransactionRequestProxyV1 extends AbstractReadTransactionRequest
 
     @Override
     ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
-            final ActorRef replyTo, final YangInstanceIdentifier path) {
-        return new ReadTransactionRequest(target, sequence, replyTo, path);
+            final ActorRef replyTo, final YangInstanceIdentifier path, final boolean snapshotOnly) {
+        return new ReadTransactionRequest(target, sequence, replyTo, path, snapshotOnly);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java
new file mode 100644 (file)
index 0000000..b87819c
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract superclass of both ClientSnapshot and ClientTransaction. Provided for convenience.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> extends LocalAbortable
+        implements Identifiable<TransactionIdentifier> {
+    /*
+     * Our state consist of the the proxy map, hence we just subclass ConcurrentHashMap directly.
+     */
+    private static final class State<T> extends ConcurrentHashMap<Long, T> {
+        private static final long serialVersionUID = 1L;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHandle.class);
+    @SuppressWarnings("rawtypes")
+    private static final AtomicReferenceFieldUpdater<AbstractClientHandle, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(AbstractClientHandle.class, State.class, "state");
+
+    private final TransactionIdentifier transactionId;
+    private final AbstractClientHistory parent;
+
+    private volatile State<T> state = new State<>();
+
+    // Hidden to prevent outside instantiation
+    AbstractClientHandle(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
+        this.transactionId = Preconditions.checkNotNull(transactionId);
+        this.parent = Preconditions.checkNotNull(parent);
+    }
+
+    @Override
+    public final TransactionIdentifier getIdentifier() {
+        return transactionId;
+    }
+
+    /**
+     * Release all state associated with this transaction.
+     *
+     * @return True if this transaction became closed during this call
+     */
+    public final boolean abort() {
+        if (commonAbort()) {
+            parent.onTransactionAbort(this);
+            return true;
+        }
+
+        return false;
+    }
+
+    private boolean commonAbort() {
+        final Collection<T> toClose = ensureClosed();
+        if (toClose == null) {
+            return false;
+        }
+
+        toClose.forEach(AbstractProxyTransaction::abort);
+        return true;
+    }
+
+    @Override
+    final void localAbort(final Throwable cause) {
+        LOG.debug("Local abort of transaction {}", getIdentifier(), cause);
+        commonAbort();
+    }
+
+    /**
+     * Make sure this snapshot is closed. If it became closed as the effect of this call, return a collection of
+     * {@link AbstractProxyTransaction} handles which need to be closed, too.
+     *
+     * @return null if this snapshot has already been closed, otherwise a collection of proxies, which need to be
+     *         closed, too.
+     */
+    @Nullable final Collection<T> ensureClosed() {
+        @SuppressWarnings("unchecked")
+        final State<T> local = STATE_UPDATER.getAndSet(this, null);
+        return local == null ? null : local.values();
+    }
+
+    final T ensureProxy(final YangInstanceIdentifier path, final Function<Long, T> createProxy) {
+        final Map<Long, T> local = getState();
+        final Long shard = parent.resolveShardForPath(path);
+
+        return local.computeIfAbsent(shard, createProxy);
+    }
+
+    final AbstractClientHistory parent() {
+        return parent;
+    }
+
+    private State<T> getState() {
+        final State<T> local = state;
+        Preconditions.checkState(local != null, "Transaction %s is closed", transactionId);
+        return local;
+    }
+}
index 519763a..1be8464 100644 (file)
@@ -49,7 +49,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
 
     @GuardedBy("this")
-    private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+    private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
     @GuardedBy("this")
     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
 
@@ -99,7 +99,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
             LOG.debug("Force-closing history {}", getIdentifier(), cause);
 
             synchronized (this) {
-                for (ClientTransaction t : openTransactions.values()) {
+                for (AbstractClientHandle<?> t : openTransactions.values()) {
                     t.localAbort(cause);
                 }
                 openTransactions.clear();
@@ -136,32 +136,41 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         LOG.debug("Create history response {}", response);
     }
 
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+    private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
         while (true) {
-            final ProxyHistory history;
             try {
-                history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+                return histories.computeIfAbsent(shard, this::createHistoryProxy);
             } catch (InversibleLockException e) {
                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
                 e.awaitResolution();
                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
-                continue;
             }
+        }
+    }
+
+    final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
+        return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
+    }
+
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+        return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
+    }
 
-            return history.createTransactionProxy(transactionId);
+    private void checkNotClosed() {
+        if (state == State.CLOSED) {
+            throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
         }
     }
 
     /**
-     * Allocate a {@link ClientTransaction}.
+     * Allocate a new {@link ClientTransaction}.
      *
      * @return A new {@link ClientTransaction}
      * @throws TransactionChainClosedException if this history is closed
+     * @throws IllegalStateException if a previous dependent transaction has not been closed
      */
     public final ClientTransaction createTransaction() {
-        if (state == State.CLOSED) {
-            throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
-        }
+        checkNotClosed();
 
         synchronized (this) {
             final ClientTransaction ret = doCreateTransaction();
@@ -170,6 +179,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         }
     }
 
+    /**
+     * Create a new {@link ClientSnapshot}.
+     *
+     * @return A new {@link ClientSnapshot}
+     * @throws TransactionChainClosedException if this history is closed
+     * @throws IllegalStateException if a previous dependent transaction has not been closed
+     */
+    public final ClientSnapshot takeSnapshot() {
+        checkNotClosed();
+
+        synchronized (this) {
+            final ClientSnapshot ret = doCreateSnapshot();
+            openTransactions.put(ret.getIdentifier(), ret);
+            return ret;
+        }
+    }
+
+    @GuardedBy("this")
+    abstract ClientSnapshot doCreateSnapshot();
+
     @GuardedBy("this")
     abstract ClientTransaction doCreateTransaction();
 
@@ -179,10 +208,12 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
      * @param txId Transaction identifier
      * @param cohort Transaction commit cohort
      */
-    synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+    synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
             final AbstractTransactionCommitCohort cohort) {
-        final ClientTransaction tx = openTransactions.remove(txId);
-        Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+        final TransactionIdentifier txId = tx.getIdentifier();
+        if (openTransactions.remove(txId) == null) {
+            LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
+        }
 
         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
@@ -196,11 +227,11 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
      * backend.
      *
-     * @param txId transaction identifier
+     * @param snapshot transaction identifier
      */
-    synchronized void onTransactionAbort(final TransactionIdentifier txId) {
-        if (openTransactions.remove(txId) == null) {
-            LOG.warn("Could not find aborting transaction {}", txId);
+    synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
+        if (openTransactions.remove(snapshot.getIdentifier()) == null) {
+            LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
         }
     }
 
index 3dc4dbf..202fe5b 100644 (file)
@@ -190,6 +190,11 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
         return singleHistory.createTransaction();
     }
 
+    @Override
+    public final ClientSnapshot createSnapshot() {
+        return singleHistory.doCreateSnapshot();
+    }
+
     @Override
     public final void close() {
         context().executeInActor(this::shutdown);
index 36f9a4b..22be140 100644 (file)
@@ -199,16 +199,19 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void delete(final YangInstanceIdentifier path) {
+        checkReadWrite();
         checkNotSealed();
         doDelete(path);
     }
 
     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkReadWrite();
         checkNotSealed();
         doMerge(path, data);
     }
 
     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkReadWrite();
         checkNotSealed();
         doWrite(path, data);
     }
@@ -265,6 +268,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return (SuccessorState) local;
     }
 
+    private void checkReadWrite() {
+        if (isSnapshotOnly()) {
+            throw new UnsupportedOperationException("Transaction " + getIdentifier() + " is a read-only snapshot");
+        }
+    }
+
     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
         successfulRequests.add(Verify.verifyNotNull(req));
     }
@@ -317,6 +326,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @return Future completion
      */
     final ListenableFuture<Boolean> directCommit() {
+        checkReadWrite();
         checkSealed();
 
         // Precludes startReconnect() from interfering with the fast path
@@ -346,6 +356,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void canCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         // Precludes startReconnect() from interfering with the fast path
@@ -379,6 +390,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void preCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
@@ -398,6 +410,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void doCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
@@ -505,6 +518,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
+    abstract boolean isSnapshotOnly();
+
     abstract void doDelete(final YangInstanceIdentifier path);
 
     abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
index ac18728..26b03e3 100644 (file)
@@ -39,38 +39,50 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
         }
     }
 
-    @Override
-    ClientTransaction doCreateTransaction() {
+    private State ensureIdleState() {
         final State local = state();
         Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
-        updateState(local, State.TX_OPEN);
+        return local;
+    }
+
+    @Override
+    ClientSnapshot doCreateSnapshot() {
+        ensureIdleState();
+        return new ClientSnapshot(this, new TransactionIdentifier(getIdentifier(), nextTx()));
+    }
 
+    @Override
+    ClientTransaction doCreateTransaction() {
+        updateState(ensureIdleState(), State.TX_OPEN);
         return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
     }
 
     @Override
-    void onTransactionAbort(final TransactionIdentifier txId) {
-        final State local = state();
-        if (local == State.TX_OPEN) {
-            updateState(local, State.IDLE);
+    void onTransactionAbort(final AbstractClientHandle<?> snap) {
+        if (snap instanceof ClientTransaction) {
+            final State local = state();
+            if (local == State.TX_OPEN) {
+                updateState(local, State.IDLE);
+            }
         }
 
-        super.onTransactionAbort(txId);
+        super.onTransactionAbort(snap);
     }
 
     @Override
-    AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+    AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
             final AbstractTransactionCommitCohort cohort) {
+
         final State local = state();
         switch (local) {
             case CLOSED:
-                return super.onTransactionReady(txId, cohort);
+                return super.onTransactionReady(tx, cohort);
             case IDLE:
                 throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s",
-                    this, txId));
+                    this, tx.getIdentifier()));
             case TX_OPEN:
                 updateState(local, State.IDLE);
-                return super.onTransactionReady(txId, cohort);
+                return super.onTransactionReady(tx, cohort);
             default:
                 throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local));
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java
new file mode 100644 (file)
index 0000000..482d1a5
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Snapshot of the datastore state. Note this snapshot is not consistent across shards because sub-shard snapshots are
+ * created lazily.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public class ClientSnapshot extends AbstractClientHandle<AbstractProxyTransaction> {
+    // Hidden to prevent outside instantiation
+    ClientSnapshot(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
+        super(parent, transactionId);
+    }
+
+    private AbstractProxyTransaction createProxy(final Long shard) {
+        return parent().createSnapshotProxy(getIdentifier(), shard);
+    }
+
+    private AbstractProxyTransaction ensureSnapshotProxy(final YangInstanceIdentifier path) {
+        return ensureProxy(path, this::createProxy);
+    }
+
+    public final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        return ensureSnapshotProxy(path).exists(path);
+    }
+
+    public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+            final YangInstanceIdentifier path) {
+        return ensureSnapshotProxy(path).read(path);
+    }
+}
index abb1345..334ab71 100644 (file)
@@ -12,20 +12,15 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.Collection;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Client-side view of a free-standing transaction.
+ * Client-side view of a transaction.
  *
  * <p>
  * This interface is used by the world outside of the actor system and in the actor system it is manifested via
@@ -55,127 +50,61 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 @Beta
-public final class ClientTransaction extends LocalAbortable implements Identifiable<TransactionIdentifier> {
-    private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class);
-    private static final AtomicIntegerFieldUpdater<ClientTransaction> STATE_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state");
-    private static final int OPEN_STATE = 0;
-    private static final int CLOSED_STATE = 1;
-
-    private final Map<Long, AbstractProxyTransaction> proxies = new ConcurrentHashMap<>();
-    private final TransactionIdentifier transactionId;
-    private final AbstractClientHistory parent;
-
-    private volatile int state = OPEN_STATE;
+public final class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
 
     ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
-        this.transactionId = Preconditions.checkNotNull(transactionId);
-        this.parent = Preconditions.checkNotNull(parent);
+        super(parent, transactionId);
     }
 
-    private void checkNotClosed() {
-        Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId);
-    }
 
     private AbstractProxyTransaction createProxy(final Long shard) {
-        return parent.createTransactionProxy(transactionId, shard);
-    }
-
-    private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) {
-        checkNotClosed();
-
-        final Long shard = parent.resolveShardForPath(path);
-        return proxies.computeIfAbsent(shard, this::createProxy);
+        return parent().createTransactionProxy(getIdentifier(), shard);
     }
 
-    @Override
-    public TransactionIdentifier getIdentifier() {
-        return transactionId;
+    private AbstractProxyTransaction ensureTransactionProxy(final YangInstanceIdentifier path) {
+        return ensureProxy(path, this::createProxy);
     }
 
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-        return ensureProxy(path).exists(path);
+        return ensureTransactionProxy(path).exists(path);
     }
 
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
-        return ensureProxy(path).read(path);
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+            final YangInstanceIdentifier path) {
+        return ensureTransactionProxy(path).read(path);
     }
 
     public void delete(final YangInstanceIdentifier path) {
-        ensureProxy(path).delete(path);
+        ensureTransactionProxy(path).delete(path);
     }
 
     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        ensureProxy(path).merge(path, data);
+        ensureTransactionProxy(path).merge(path, data);
     }
 
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        ensureProxy(path).write(path, data);
-    }
-
-    private boolean ensureClosed() {
-        final int local = state;
-        if (local == CLOSED_STATE) {
-            return false;
-        }
-
-        final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE);
-        Preconditions.checkState(success, "Transaction %s raced during close", this);
-        return true;
+        ensureTransactionProxy(path).write(path, data);
     }
 
     public DOMStoreThreePhaseCommitCohort ready() {
-        Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this);
-
-        for (AbstractProxyTransaction p : proxies.values()) {
-            p.seal();
-        }
+        final Collection<AbstractProxyTransaction> toReady = ensureClosed();
+        Preconditions.checkState(toReady != null, "Attempted to submit a closed transaction %s", this);
 
+        toReady.forEach(AbstractProxyTransaction::seal);
         final AbstractTransactionCommitCohort cohort;
-        switch (proxies.size()) {
+        switch (toReady.size()) {
             case 0:
-                cohort = new EmptyTransactionCommitCohort(parent, transactionId);
+                cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier());
                 break;
             case 1:
-                cohort = new DirectTransactionCommitCohort(parent, transactionId,
-                    Iterables.getOnlyElement(proxies.values()));
+                cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(),
+                    Iterables.getOnlyElement(toReady));
                 break;
             default:
-                cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values());
+                cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady);
                 break;
         }
 
-        return parent.onTransactionReady(transactionId, cohort);
-    }
-
-    /**
-     * Release all state associated with this transaction.
-     */
-    public void abort() {
-        if (commonAbort()) {
-            parent.onTransactionAbort(transactionId);
-        }
-    }
-
-    private boolean commonAbort() {
-        if (!ensureClosed()) {
-            return false;
-        }
-
-        for (AbstractProxyTransaction proxy : proxies.values()) {
-            proxy.abort();
-        }
-        proxies.clear();
-        return true;
-    }
-
-    @Override
-    void localAbort(final Throwable cause) {
-        LOG.debug("Local abort of transaction {}", getIdentifier(), cause);
-        commonAbort();
-    }
-
-    Map<Long, AbstractProxyTransaction> getProxies() {
-        return proxies;
+        return parent().onTransactionReady(this, cohort);
     }
 }
index 63dc87b..2032b54 100644 (file)
@@ -36,6 +36,13 @@ public interface DataStoreClient extends Identifiable<ClientIdentifier>, AutoClo
      */
     @Nonnull ClientLocalHistory createLocalHistory();
 
+    /**
+     * Create a new free-standing snapshot.
+     *
+     * @return Client snapshot handle
+     */
+    @Nonnull ClientSnapshot createSnapshot();
+
     /**
      * Create a new free-standing transaction.
      *
index e994117..f75d443 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.util.function.Consumer;
@@ -20,17 +19,9 @@ import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactio
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
-import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
-import org.opendaylight.controller.cluster.access.commands.TransactionModification;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
@@ -38,9 +29,7 @@ import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,161 +50,56 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 @NotThreadSafe
-final class LocalProxyTransaction extends AbstractProxyTransaction {
+abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
 
     private final TransactionIdentifier identifier;
 
-    private CursorAwareDataTreeModification modification;
-    private CursorAwareDataTreeModification sealedModification;
-
-    LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
-        final CursorAwareDataTreeModification modification) {
+    LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
         super(parent);
         this.identifier = Preconditions.checkNotNull(identifier);
-        this.modification = Preconditions.checkNotNull(modification);
     }
 
     @Override
-    public TransactionIdentifier getIdentifier() {
+    public final TransactionIdentifier getIdentifier() {
         return identifier;
     }
 
-    @Override
-    void doDelete(final YangInstanceIdentifier path) {
-        modification.delete(path);
-    }
+    abstract DataTreeSnapshot readOnlyView();
 
-    @Override
-    void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        modification.merge(path, data);
-    }
+    abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+            @Nullable Consumer<Response<?, ?>> callback);
 
     @Override
-    void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        modification.write(path, data);
+    final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
+        return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
     }
 
     @Override
-    CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
-        return Futures.immediateCheckedFuture(modification.readNode(path).isPresent());
+    final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
+        return Futures.immediateCheckedFuture(readOnlyView().readNode(path));
     }
 
     @Override
-    CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
-        return Futures.immediateCheckedFuture(modification.readNode(path));
-    }
-
-    private RuntimeException abortedException() {
-        return new IllegalStateException("Tracker " + identifier + " has been aborted");
-    }
-
-    private RuntimeException submittedException() {
-        return new IllegalStateException("Tracker " + identifier + " has been submitted");
-    }
-
-    @Override
-    void doAbort() {
+    final void doAbort() {
         sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
             LOG.debug("Transaction {} abort completed with {}", identifier, response);
         });
     }
 
-    @Override
-    CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
-        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(),
-            localActor(), modification, coordinated);
-        modification = new FailedDataTreeModification(this::submittedException);
-        return ret;
-    }
-
-    @Override
-    void doSeal() {
-        modification.ready();
-        sealedModification = modification;
-    }
-
-    @Override
-    void flushState(final AbstractProxyTransaction successor) {
-        sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
-            @Override
-            public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
-                successor.write(current().node(child), data);
-            }
-
-            @Override
-            public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
-                successor.merge(current().node(child), data);
-            }
-
-            @Override
-            public void delete(final PathArgument child) {
-                successor.delete(current().node(child));
-            }
-        });
-    }
-
-    DataTreeSnapshot getSnapshot() {
-        Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier);
-        return sealedModification;
-    }
-
-    private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
-            final @Nullable Consumer<Response<?, ?>> callback) {
-        for (TransactionModification mod : request.getModifications()) {
-            if (mod instanceof TransactionWrite) {
-                modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
-            } else if (mod instanceof TransactionMerge) {
-                modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
-            } else if (mod instanceof TransactionDelete) {
-                modification.delete(mod.getPath());
-            } else {
-                throw new IllegalArgumentException("Unsupported modification " + mod);
-            }
-        }
-
-        final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
-        if (maybeProtocol.isPresent()) {
-            seal();
-            Verify.verify(callback != null, "Request {} has null callback", request);
-
-            switch (maybeProtocol.get()) {
-                case ABORT:
-                    sendAbort(callback);
-                    break;
-                case SIMPLE:
-                    sendRequest(commitRequest(false), callback);
-                    break;
-                case THREE_PHASE:
-                    sendRequest(commitRequest(true), callback);
-                    break;
-                default:
-                    throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
-            }
-        }
-    }
-
     @Override
     void handleForwardedRemoteRequest(final TransactionRequest<?> request,
             final @Nullable Consumer<Response<?, ?>> callback) {
-        LOG.debug("Applying forwarded request {}", request);
-
         if (request instanceof ModifyTransactionRequest) {
             applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
         } else if (request instanceof ReadTransactionRequest) {
             final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
-            final Optional<NormalizedNode<?, ?>> result = modification.readNode(path);
+            final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
             callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
         } else if (request instanceof ExistsTransactionRequest) {
             final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
-            final boolean result = modification.readNode(path).isPresent();
+            final boolean result = readOnlyView().readNode(path).isPresent();
             callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
-        } else if (request instanceof TransactionPreCommitRequest) {
-            sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
-        } else if (request instanceof TransactionDoCommitRequest) {
-            sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
-        } else if (request instanceof TransactionAbortRequest) {
-            sendAbort(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -263,8 +147,6 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
             final Consumer<Response<?, ?>> callback) {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
-        } else if (request instanceof CommitLocalTransactionRequest) {
-            successor.sendCommit((CommitLocalTransactionRequest)request, callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -272,18 +154,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
         LOG.debug("Forwarded request {} to successor {}", request, successor);
     }
 
-    private void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+    void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         sendRequest(request, callback);
-        modification = new FailedDataTreeModification(this::abortedException);
-    }
-
-    private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
-        // Rebase old modification on new data tree.
-        try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
-            request.getModification().applyToCursor(cursor);
-        }
-
-        seal();
-        sendRequest(commitRequest(request.isCoordinated()), callback);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
new file mode 100644 (file)
index 0000000..eb2362d
--- /dev/null
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A read-only specialization of {@link LocalProxyTransaction}.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalReadOnlyProxyTransaction.class);
+
+    private final DataTreeSnapshot snapshot;
+
+    LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+        final DataTreeSnapshot snapshot) {
+        super(parent, identifier);
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+    }
+
+    @Override
+    boolean isSnapshotOnly() {
+        return true;
+    }
+
+    @Override
+    DataTreeSnapshot readOnlyView() {
+        return snapshot;
+    }
+
+    @Override
+    void doDelete(final YangInstanceIdentifier path) {
+        throw new UnsupportedOperationException("Read-only snapshot");
+    }
+
+    @Override
+    void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        throw new UnsupportedOperationException("Read-only snapshot");
+    }
+
+    @Override
+    void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        throw new UnsupportedOperationException("Read-only snapshot");
+    }
+
+    @Override
+    CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
+        throw new UnsupportedOperationException("Read-only snapshot");
+    }
+
+    @Override
+    void doSeal() {
+        // No-op
+    }
+
+    @Override
+    void flushState(final AbstractProxyTransaction successor) {
+        // No-op
+    }
+
+    @Override
+    void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+            final Consumer<Response<?, ?>> callback) {
+        Verify.verify(request.getModifications().isEmpty());
+
+        final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
+        Verify.verify(protocol == PersistenceProtocol.ABORT);
+        abort();
+    }
+
+    @Override
+    void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) {
+        if (request instanceof CommitLocalTransactionRequest) {
+            final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+            final DataTreeModification mod = req.getModification();
+
+            LOG.debug("Applying modification {} to successor {}", mod, successor);
+            mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+                @Override
+                public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.write(current().node(child), data);
+                }
+
+                @Override
+                public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.merge(current().node(child), data);
+                }
+
+                @Override
+                public void delete(final PathArgument child) {
+                    successor.delete(current().node(child));
+                }
+            });
+
+            successor.seal();
+
+            final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+            successor.sendRequest(successorReq, callback);
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            LOG.debug("Forwarding abort {} to successor {}", request, successor);
+            successor.abort();
+        } else {
+            throw new IllegalArgumentException("Unhandled request" + request);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
new file mode 100644 (file)
index 0000000..8195f8d
--- /dev/null
@@ -0,0 +1,257 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
+ * the client instance.
+ *
+ * <p>
+ * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
+ * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
+ * leader.
+ *
+ * <p>
+ * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
+ * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class);
+
+    private CursorAwareDataTreeModification modification;
+    private CursorAwareDataTreeModification sealedModification;
+
+    LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+        final DataTreeSnapshot snapshot) {
+        super(parent, identifier);
+        this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
+    }
+
+    @Override
+    boolean isSnapshotOnly() {
+        return false;
+    }
+
+    @Override
+    CursorAwareDataTreeSnapshot readOnlyView() {
+        return modification;
+    }
+
+    @Override
+    void doDelete(final YangInstanceIdentifier path) {
+        modification.delete(path);
+    }
+
+    @Override
+    void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        modification.merge(path, data);
+    }
+
+    @Override
+    void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        modification.write(path, data);
+    }
+
+    private RuntimeException abortedException() {
+        return new IllegalStateException("Tracker " + getIdentifier() + " has been aborted");
+    }
+
+    private RuntimeException submittedException() {
+        return new IllegalStateException("Tracker " + getIdentifier() + " has been submitted");
+    }
+
+    @Override
+    CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
+        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
+            localActor(), modification, coordinated);
+        modification = new FailedDataTreeModification(this::submittedException);
+        return ret;
+    }
+
+    @Override
+    void doSeal() {
+        modification.ready();
+        sealedModification = modification;
+    }
+
+    @Override
+    void flushState(final AbstractProxyTransaction successor) {
+        sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
+            @Override
+            public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                successor.write(current().node(child), data);
+            }
+
+            @Override
+            public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                successor.merge(current().node(child), data);
+            }
+
+            @Override
+            public void delete(final PathArgument child) {
+                successor.delete(current().node(child));
+            }
+        });
+    }
+
+    DataTreeSnapshot getSnapshot() {
+        Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier());
+        return sealedModification;
+    }
+
+    @Override
+    void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+            final @Nullable Consumer<Response<?, ?>> callback) {
+        for (TransactionModification mod : request.getModifications()) {
+            if (mod instanceof TransactionWrite) {
+                modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
+            } else if (mod instanceof TransactionMerge) {
+                modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
+            } else if (mod instanceof TransactionDelete) {
+                modification.delete(mod.getPath());
+            } else {
+                throw new IllegalArgumentException("Unsupported modification " + mod);
+            }
+        }
+
+        final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+        if (maybeProtocol.isPresent()) {
+            seal();
+            Verify.verify(callback != null, "Request {} has null callback", request);
+
+            switch (maybeProtocol.get()) {
+                case ABORT:
+                    sendAbort(callback);
+                    break;
+                case SIMPLE:
+                    sendRequest(commitRequest(false), callback);
+                    break;
+                case THREE_PHASE:
+                    sendRequest(commitRequest(true), callback);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
+            }
+        }
+    }
+
+    @Override
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback) {
+        LOG.debug("Applying forwarded request {}", request);
+
+        if (request instanceof TransactionPreCommitRequest) {
+            sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionAbortRequest) {
+            sendAbort(callback);
+        } else {
+            super.handleForwardedRemoteRequest(request, callback);
+        }
+    }
+
+    @Override
+    void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) {
+        if (request instanceof CommitLocalTransactionRequest) {
+            final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+            final DataTreeModification mod = req.getModification();
+
+            LOG.debug("Applying modification {} to successor {}", mod, successor);
+            mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+                @Override
+                public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.write(current().node(child), data);
+                }
+
+                @Override
+                public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.merge(current().node(child), data);
+                }
+
+                @Override
+                public void delete(final PathArgument child) {
+                    successor.delete(current().node(child));
+                }
+            });
+
+            successor.seal();
+
+            final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+            successor.sendRequest(successorReq, callback);
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            LOG.debug("Forwarding abort {} to successor {}", request, successor);
+            successor.abort();
+        } else {
+            throw new IllegalArgumentException("Unhandled request" + request);
+        }
+    }
+
+    @Override
+    void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) {
+        if (request instanceof CommitLocalTransactionRequest) {
+            Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
+            ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback);
+            LOG.debug("Forwarded request {} to successor {}", request, successor);
+        } else {
+            super.forwardToLocal(successor, request, callback);
+        }
+    }
+
+    @Override
+    void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        super.sendAbort(request, callback);
+        modification = new FailedDataTreeModification(this::abortedException);
+    }
+
+    private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+        // Rebase old modification on new data tree.
+        try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
+            request.getModification().applyToCursor(cursor);
+        }
+
+        seal();
+        sendRequest(commitRequest(request.isCoordinated()), callback);
+    }
+}
index b3b604b..846f5c3 100644 (file)
@@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
@@ -66,20 +65,21 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         final AbstractProxyTransaction doCreateTransactionProxy(
-                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
-            return new RemoteProxyTransaction(this, txId);
+                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
+                final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly);
         }
     }
 
     private static final class Local extends AbstractLocal {
-        private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
-                AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
+        private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
 
         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
         // the open one and attempts to create a new transaction again.
-        private LocalProxyTransaction lastOpen;
+        private LocalReadWriteProxyTransaction lastOpen;
 
-        private volatile LocalProxyTransaction lastSealed;
+        private volatile LocalReadWriteProxyTransaction lastSealed;
 
         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
             final DataTree dataTree) {
@@ -88,11 +88,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId) {
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
 
             // onTransactionCompleted() runs concurrently
-            final LocalProxyTransaction localSealed = lastSealed;
+            final LocalReadWriteProxyTransaction localSealed = lastSealed;
             final DataTreeSnapshot baseSnapshot;
             if (localSealed != null) {
                 baseSnapshot = localSealed.getSnapshot();
@@ -100,8 +100,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 baseSnapshot = takeSnapshot();
             }
 
-            lastOpen = new LocalProxyTransaction(this, txId,
-                (CursorAwareDataTreeModification) baseSnapshot.newModification());
+            if (snapshotOnly) {
+                return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
+            }
+
+            lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
             return lastOpen;
         }
@@ -113,16 +116,18 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         void onTransactionAborted(final AbstractProxyTransaction tx) {
-            Preconditions.checkState(tx.equals(lastOpen));
-            lastOpen = null;
+            if (tx.equals(lastOpen)) {
+                lastOpen = null;
+            }
         }
 
         @Override
         void onTransactionCompleted(final AbstractProxyTransaction tx) {
             Verify.verify(tx instanceof LocalProxyTransaction);
-
-            if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
-                LOG.debug("Completed last sealed transaction {}", tx);
+            if (tx instanceof LocalReadWriteProxyTransaction) {
+                if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+                    LOG.debug("Completed last sealed transaction {}", tx);
+                }
             }
         }
 
@@ -142,9 +147,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId) {
-            return new LocalProxyTransaction(this, txId,
-                (CursorAwareDataTreeModification) takeSnapshot().newModification());
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            final DataTreeSnapshot snapshot = takeSnapshot();
+            return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
+                new LocalReadWriteProxyTransaction(this, txId, snapshot);
         }
 
         @Override
@@ -212,7 +218,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
             for (AbstractProxyTransaction t : proxies.values()) {
                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
-                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
+                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
+                    t.isSnapshotOnly());
                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
                 t.replayMessages(newProxy, previousEntries);
             }
@@ -311,15 +318,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return connection.localActor();
     }
 
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+            final boolean snapshotOnly) {
         lock.lock();
         try {
             if (successor != null) {
-                return successor.createTransactionProxy(txId);
+                return successor.createTransactionProxy(txId, snapshotOnly);
             }
 
             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
+            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
             proxies.put(proxyId, ret);
             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
             return ret;
@@ -356,7 +364,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     @GuardedBy("lock")
     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
-            TransactionIdentifier txId);
+            TransactionIdentifier txId, boolean snapshotOnly);
 
     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
 
index 783096b..1429ec5 100644 (file)
@@ -63,16 +63,24 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
 
     private final ModifyTransactionRequestBuilder builder;
+    private final boolean snapshotOnly;
 
     private boolean builderBusy;
 
     private volatile Exception operationFailure;
 
-    RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+    RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+            final boolean snapshotOnly) {
         super(parent);
+        this.snapshotOnly = snapshotOnly;
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
     }
 
+    @Override
+    boolean isSnapshotOnly() {
+        return snapshotOnly;
+    }
+
     @Override
     public TransactionIdentifier getIdentifier() {
         return builder.getIdentifier();
@@ -110,15 +118,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     @Override
     CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
         final SettableFuture<Boolean> future = SettableFuture.create();
-        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
-            t -> completeExists(future, t), future);
+        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
+            isSnapshotOnly()), t -> completeExists(future, t), future);
     }
 
     @Override
     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
-        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
-            t -> completeRead(future, t), future);
+        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
+            isSnapshotOnly()), t -> completeRead(future, t), future);
     }
 
     @Override
@@ -302,11 +310,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         } else if (request instanceof ReadTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
-                ((ReadTransactionRequest) request).getPath()), callback);
+                ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
-                ((ExistsTransactionRequest) request).getPath()), callback);
+                ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
         } else if (request instanceof TransactionPreCommitRequest) {
             ensureFlushedBuider();
             sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
index c04c9c5..8220bfa 100644 (file)
@@ -25,6 +25,14 @@ final class SingleClientHistory extends AbstractClientHistory {
         super(client, identifier);
     }
 
+    @Override
+    ClientSnapshot doCreateSnapshot() {
+        final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx());
+        LOG.debug("{}: creating a new snapshot {}", this, txId);
+
+        return new ClientSnapshot(this, txId);
+    }
+
     @Override
     ClientTransaction doCreateTransaction() {
         final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx());
index 1adca56..7c2ddb0 100644 (file)
@@ -13,6 +13,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
@@ -67,14 +68,7 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
                 throw UNSEQUENCED_START;
             }
 
-            if (request instanceof CommitLocalTransactionRequest) {
-                tx = createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
-                LOG.debug("{}: allocated new ready transaction {}", persistenceId(), id);
-            } else {
-                tx = createOpenTransaction(id);
-                LOG.debug("{}: allocated new open transaction {}", persistenceId(), id);
-            }
-
+            tx = createTransaction(request, id);
             transactions.put(id, tx);
         } else {
             final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
@@ -88,6 +82,25 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
         return tx.handleRequest(request, envelope, now);
     }
 
+    private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
+            throws RequestException {
+        if (request instanceof CommitLocalTransactionRequest) {
+            LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id);
+            return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
+        }
+        if (request instanceof AbstractReadTransactionRequest) {
+            if (((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
+                LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id);
+                return createOpenSnapshot(id);
+            }
+        }
+
+        LOG.debug("{}: allocating new open transaction {}", persistenceId(), id);
+        return createOpenTransaction(id);
+    }
+
+    abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id) throws RequestException;
+
     abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
 
     abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java
new file mode 100644 (file)
index 0000000..36a876b
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Read-only frontend transaction state as observed by the shard leader.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class FrontendReadOnlyTransaction extends FrontendTransaction {
+    private final ReadOnlyShardDataTreeTransaction openTransaction;
+
+    private FrontendReadOnlyTransaction(final AbstractFrontendHistory history,
+            final ReadOnlyShardDataTreeTransaction transaction) {
+        super(history, transaction.getIdentifier());
+        this.openTransaction = Preconditions.checkNotNull(transaction);
+    }
+
+    static FrontendReadOnlyTransaction create(final AbstractFrontendHistory history,
+            final ReadOnlyShardDataTreeTransaction transaction) {
+        return new FrontendReadOnlyTransaction(history, transaction);
+    }
+
+    // Sequence has already been checked
+    @Override
+    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
+        if (request instanceof ExistsTransactionRequest) {
+            return handleExistsTransaction((ExistsTransactionRequest) request);
+        } else if (request instanceof ReadTransactionRequest) {
+            return handleReadTransaction((ReadTransactionRequest) request);
+        } else if (request instanceof TransactionAbortRequest) {
+            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+        } else {
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
+    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        openTransaction.abort();
+        return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence());
+    }
+
+    private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
+            throws RequestException {
+        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(openTransaction.getIdentifier(),
+            request.getSequence(), data.isPresent()));
+    }
+
+    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+            throws RequestException {
+        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        return recordSuccess(request.getSequence(), new ReadTransactionSuccess(openTransaction.getIdentifier(),
+            request.getSequence(), data));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
new file mode 100644 (file)
index 0000000..79d555c
--- /dev/null
@@ -0,0 +1,310 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend read-write transaction state as observed by the shard leader.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class FrontendReadWriteTransaction extends FrontendTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
+
+    private ReadWriteShardDataTreeTransaction openTransaction;
+    private DataTreeModification sealedModification;
+    private ShardDataTreeCohort readyCohort;
+
+    private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+            final ReadWriteShardDataTreeTransaction transaction) {
+        super(history, id);
+        this.openTransaction = Preconditions.checkNotNull(transaction);
+    }
+
+    private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+            final DataTreeModification mod) {
+        super(history, id);
+        this.sealedModification = Preconditions.checkNotNull(mod);
+    }
+
+    static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
+            final ReadWriteShardDataTreeTransaction transaction) {
+        return new FrontendReadWriteTransaction(history, transaction.getIdentifier(), transaction);
+    }
+
+    static FrontendReadWriteTransaction createReady(final AbstractFrontendHistory history,
+            final TransactionIdentifier id, final DataTreeModification mod) {
+        return new FrontendReadWriteTransaction(history, id, mod);
+    }
+
+    // Sequence has already been checked
+    @Override
+    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
+        if (request instanceof ModifyTransactionRequest) {
+            return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
+        } else if (request instanceof CommitLocalTransactionRequest) {
+            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
+            return null;
+        } else if (request instanceof ExistsTransactionRequest) {
+            return handleExistsTransaction((ExistsTransactionRequest) request);
+        } else if (request instanceof ReadTransactionRequest) {
+            return handleReadTransaction((ReadTransactionRequest) request);
+        } else if (request instanceof TransactionPreCommitRequest) {
+            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
+            return null;
+        } else if (request instanceof TransactionDoCommitRequest) {
+            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
+            return null;
+        } else if (request instanceof TransactionAbortRequest) {
+            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+        } else {
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
+    private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+            @Override
+            public void onSuccess(final DataTreeCandidate result) {
+                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+                    request.getSequence()));
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
+        readyCohort.commit(new FutureCallback<UnsignedLong>() {
+            @Override
+            public void onSuccess(final UnsignedLong result) {
+                successfulCommit(envelope, now);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        if (readyCohort == null) {
+            openTransaction.abort();
+            return new TransactionAbortSuccess(getIdentifier(), request.getSequence());
+        }
+
+        readyCohort.abort(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                readyCohort = null;
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+                    request.getSequence()));
+                LOG.debug("Transaction {} aborted", getIdentifier());
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                readyCohort = null;
+                LOG.warn("Transaction {} abort failed", getIdentifier(), failure);
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
+            }
+        });
+        return null;
+    }
+
+    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
+        readyCohort.canCommit(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+                    envelope.getMessage().getSequence()));
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void directCommit(final RequestEnvelope envelope, final long now) {
+        readyCohort.canCommit(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                successfulDirectCanCommit(envelope, now);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+
+    }
+
+    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+            @Override
+            public void onSuccess(final DataTreeCandidate result) {
+                successfulDirectPreCommit(envelope, startTime);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+        readyCohort.commit(new FutureCallback<UnsignedLong>() {
+
+            @Override
+            public void onSuccess(final UnsignedLong result) {
+                successfulCommit(envelope, startTime);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+            envelope.getMessage().getSequence()));
+        readyCohort = null;
+    }
+
+    private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        if (sealedModification.equals(request.getModification())) {
+            readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+
+            if (request.isCoordinated()) {
+                coordinatedCommit(envelope, now);
+            } else {
+                directCommit(envelope, now);
+            }
+        } else {
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
+    private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
+            throws RequestException {
+        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
+            data.isPresent()));
+    }
+
+    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+            throws RequestException {
+        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
+            data));
+    }
+
+    private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
+        return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
+    }
+
+    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+
+        final DataTreeModification modification = openTransaction.getSnapshot();
+        for (TransactionModification m : request.getModifications()) {
+            if (m instanceof TransactionDelete) {
+                modification.delete(m.getPath());
+            } else if (m instanceof TransactionWrite) {
+                modification.write(m.getPath(), ((TransactionWrite) m).getData());
+            } else if (m instanceof TransactionMerge) {
+                modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+            } else {
+                LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+            }
+        }
+
+        final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
+        if (!maybeProto.isPresent()) {
+            return replyModifySuccess(request.getSequence());
+        }
+
+        switch (maybeProto.get()) {
+            case ABORT:
+                openTransaction.abort();
+                openTransaction = null;
+                return replyModifySuccess(request.getSequence());
+            case SIMPLE:
+                readyCohort = openTransaction.ready();
+                openTransaction = null;
+                directCommit(envelope, now);
+                return null;
+            case THREE_PHASE:
+                readyCohort = openTransaction.ready();
+                openTransaction = null;
+                coordinatedCommit(envelope, now);
+                return null;
+            default:
+                throw new UnsupportedRequestException(request);
+        }
+    }
+}
index 9240aab..e4dd00b 100644 (file)
@@ -7,58 +7,29 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
 import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.Queue;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
-import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
-import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
-import org.opendaylight.controller.cluster.access.commands.TransactionModification;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.concepts.Identifiable;
 
 /**
- * Frontend transaction state as observed by the shard leader.
+ * Frontend common transaction state as observed by the shard leader.
  *
  * @author Robert Varga
  */
 @NotThreadSafe
-final class FrontendTransaction {
-    private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
-
+abstract class FrontendTransaction implements Identifiable<TransactionIdentifier> {
     private final AbstractFrontendHistory history;
     private final TransactionIdentifier id;
 
@@ -72,35 +43,21 @@ final class FrontendTransaction {
     private Long lastPurgedSequence;
     private long expectedSequence;
 
-    private ReadWriteShardDataTreeTransaction openTransaction;
-    private DataTreeModification sealedModification;
-    private ShardDataTreeCohort readyCohort;
-
-    private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
-            final ReadWriteShardDataTreeTransaction transaction) {
+    FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) {
         this.history = Preconditions.checkNotNull(history);
         this.id = Preconditions.checkNotNull(id);
-        this.openTransaction = Preconditions.checkNotNull(transaction);
     }
 
-    private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
-            final DataTreeModification mod) {
-        this.history = Preconditions.checkNotNull(history);
-        this.id = Preconditions.checkNotNull(id);
-        this.sealedModification = Preconditions.checkNotNull(mod);
+    @Override
+    public final TransactionIdentifier getIdentifier() {
+        return id;
     }
 
-    static FrontendTransaction createOpen(final AbstractFrontendHistory history,
-            final ReadWriteShardDataTreeTransaction transaction) {
-        return new FrontendTransaction(history, transaction.getIdentifier(), transaction);
+    final AbstractFrontendHistory history() {
+        return history;
     }
 
-    static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id,
-            final DataTreeModification mod) {
-        return new FrontendTransaction(history, id, mod);
-    }
-
-    java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
+    final java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
         // Fast path check: if the requested sequence is the next request, bail early
         if (expectedSequence == sequence) {
             return java.util.Optional.empty();
@@ -144,36 +101,15 @@ final class FrontendTransaction {
         return java.util.Optional.empty();
     }
 
-    void purgeSequencesUpTo(final long sequence) {
+    final void purgeSequencesUpTo(final long sequence) {
         // FIXME: implement this
 
         lastPurgedSequence = sequence;
     }
 
     // Sequence has already been checked
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
-            final long now) throws RequestException {
-        if (request instanceof ModifyTransactionRequest) {
-            return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
-        } else if (request instanceof CommitLocalTransactionRequest) {
-            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
-            return null;
-        } else if (request instanceof ExistsTransactionRequest) {
-            return handleExistsTransaction((ExistsTransactionRequest) request);
-        } else if (request instanceof ReadTransactionRequest) {
-            return handleReadTransaction((ReadTransactionRequest) request);
-        } else if (request instanceof TransactionPreCommitRequest) {
-            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
-            return null;
-        } else if (request instanceof TransactionDoCommitRequest) {
-            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
-            return null;
-        } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
-        } else {
-            throw new UnsupportedRequestException(request);
-        }
-    }
+    abstract @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request,
+            final RequestEnvelope envelope, final long now) throws RequestException;
 
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
@@ -183,7 +119,7 @@ final class FrontendTransaction {
         expectedSequence++;
     }
 
-    private <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
+    final <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
         recordResponse(sequence, success);
         return success;
     }
@@ -192,215 +128,15 @@ final class FrontendTransaction {
         return history.readTime() - startTime;
     }
 
-    private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+    final void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
             final TransactionSuccess<?> success) {
         recordResponse(success.getSequence(), success);
         envelope.sendSuccess(success, executionTime(startTime));
     }
 
-    private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+    final void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
             final RuntimeRequestException failure) {
         recordResponse(envelope.getMessage().getSequence(), failure);
         envelope.sendFailure(failure, executionTime(startTime));
     }
-
-    private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate result) {
-                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
-                    request.getSequence()));
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
-            final long now) throws RequestException {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        if (readyCohort == null) {
-            openTransaction.abort();
-            return new TransactionAbortSuccess(id, request.getSequence());
-        }
-
-        readyCohort.abort(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                readyCohort = null;
-                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
-                LOG.debug("Transaction {} aborted", id);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                readyCohort = null;
-                LOG.warn("Transaction {} abort failed", id, failure);
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
-            }
-        });
-        return null;
-    }
-
-    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
-                    envelope.getMessage().getSequence()));
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private void directCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                successfulDirectCanCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
-
-    }
-
-    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate result) {
-                successfulDirectPreCommit(envelope, startTime);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope, startTime);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
-        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
-            envelope.getMessage().getSequence()));
-        readyCohort = null;
-    }
-
-    private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        if (sealedModification.equals(request.getModification())) {
-            readyCohort = history.createReadyCohort(id, sealedModification);
-
-            if (request.isCoordinated()) {
-                coordinatedCommit(envelope, now);
-            } else {
-                directCommit(envelope, now);
-            }
-        } else {
-            throw new UnsupportedRequestException(request);
-        }
-    }
-
-    private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
-            throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
-        return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
-            data.isPresent()));
-    }
-
-    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
-            throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
-        return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
-    }
-
-    private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
-        return recordSuccess(sequence, new ModifyTransactionSuccess(id, sequence));
-    }
-
-    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-
-        final DataTreeModification modification = openTransaction.getSnapshot();
-        for (TransactionModification m : request.getModifications()) {
-            if (m instanceof TransactionDelete) {
-                modification.delete(m.getPath());
-            } else if (m instanceof TransactionWrite) {
-                modification.write(m.getPath(), ((TransactionWrite) m).getData());
-            } else if (m instanceof TransactionMerge) {
-                modification.merge(m.getPath(), ((TransactionMerge) m).getData());
-            } else {
-                LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m);
-            }
-        }
-
-        final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
-        if (!maybeProto.isPresent()) {
-            return replyModifySuccess(request.getSequence());
-        }
-
-        switch (maybeProto.get()) {
-            case ABORT:
-                openTransaction.abort();
-                openTransaction = null;
-                return replyModifySuccess(request.getSequence());
-            case SIMPLE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
-                directCommit(envelope, now);
-                return null;
-            case THREE_PHASE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
-                coordinatedCommit(envelope, now);
-                return null;
-            default:
-                throw new UnsupportedRequestException(request);
-        }
-    }
 }
index 50d1dc5..cd0cc30 100644 (file)
@@ -47,11 +47,18 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
         return chain.getIdentifier();
     }
 
+    @Override
+    FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException {
+        checkDeadTransaction(id);
+        lastSeenTransaction = id.getTransactionId();
+        return FrontendReadOnlyTransaction.create(this, chain.newReadOnlyTransaction(id));
+    }
+
     @Override
     FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
         checkDeadTransaction(id);
         lastSeenTransaction = id.getTransactionId();
-        return FrontendTransaction.createOpen(this, chain.newReadWriteTransaction(id));
+        return FrontendReadWriteTransaction.createOpen(this, chain.newReadWriteTransaction(id));
     }
 
     @Override
@@ -59,7 +66,7 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
             throws RequestException {
         checkDeadTransaction(id);
         lastSeenTransaction = id.getTransactionId();
-        return FrontendTransaction.createReady(this, id, mod);
+        return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
     @Override
index 14b0eec..fe2588d 100644 (file)
@@ -37,15 +37,20 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
         return identifier;
     }
 
+    @Override
+    FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException {
+        return FrontendReadOnlyTransaction.create(this, tree.newReadOnlyTransaction(id));
+    }
+
     @Override
     FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
-        return FrontendTransaction.createOpen(this, tree.newReadWriteTransaction(id));
+        return FrontendReadWriteTransaction.createOpen(this, tree.newReadWriteTransaction(id));
     }
 
     @Override
     FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
             throws RequestException {
-        return FrontendTransaction.createReady(this, id, mod);
+        return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
     @Override

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.