BUG-5280: add basic concept of ClientSnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index b3b604b7f08ad7e42abb4a224626673b97b8b8e3..846f5c37cfd506eb28f9e01139b0333851492281 100644 (file)
@@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
@@ -66,20 +65,21 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         final AbstractProxyTransaction doCreateTransactionProxy(
-                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
-            return new RemoteProxyTransaction(this, txId);
+                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
+                final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly);
         }
     }
 
     private static final class Local extends AbstractLocal {
-        private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
-                AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
+        private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
 
         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
         // the open one and attempts to create a new transaction again.
-        private LocalProxyTransaction lastOpen;
+        private LocalReadWriteProxyTransaction lastOpen;
 
-        private volatile LocalProxyTransaction lastSealed;
+        private volatile LocalReadWriteProxyTransaction lastSealed;
 
         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
             final DataTree dataTree) {
@@ -88,11 +88,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId) {
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
 
             // onTransactionCompleted() runs concurrently
-            final LocalProxyTransaction localSealed = lastSealed;
+            final LocalReadWriteProxyTransaction localSealed = lastSealed;
             final DataTreeSnapshot baseSnapshot;
             if (localSealed != null) {
                 baseSnapshot = localSealed.getSnapshot();
@@ -100,8 +100,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 baseSnapshot = takeSnapshot();
             }
 
-            lastOpen = new LocalProxyTransaction(this, txId,
-                (CursorAwareDataTreeModification) baseSnapshot.newModification());
+            if (snapshotOnly) {
+                return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
+            }
+
+            lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
             return lastOpen;
         }
@@ -113,16 +116,18 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         void onTransactionAborted(final AbstractProxyTransaction tx) {
-            Preconditions.checkState(tx.equals(lastOpen));
-            lastOpen = null;
+            if (tx.equals(lastOpen)) {
+                lastOpen = null;
+            }
         }
 
         @Override
         void onTransactionCompleted(final AbstractProxyTransaction tx) {
             Verify.verify(tx instanceof LocalProxyTransaction);
-
-            if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
-                LOG.debug("Completed last sealed transaction {}", tx);
+            if (tx instanceof LocalReadWriteProxyTransaction) {
+                if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+                    LOG.debug("Completed last sealed transaction {}", tx);
+                }
             }
         }
 
@@ -142,9 +147,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId) {
-            return new LocalProxyTransaction(this, txId,
-                (CursorAwareDataTreeModification) takeSnapshot().newModification());
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            final DataTreeSnapshot snapshot = takeSnapshot();
+            return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
+                new LocalReadWriteProxyTransaction(this, txId, snapshot);
         }
 
         @Override
@@ -212,7 +218,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
             for (AbstractProxyTransaction t : proxies.values()) {
                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
-                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
+                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
+                    t.isSnapshotOnly());
                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
                 t.replayMessages(newProxy, previousEntries);
             }
@@ -311,15 +318,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return connection.localActor();
     }
 
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+            final boolean snapshotOnly) {
         lock.lock();
         try {
             if (successor != null) {
-                return successor.createTransactionProxy(txId);
+                return successor.createTransactionProxy(txId, snapshotOnly);
             }
 
             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
+            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
             proxies.put(proxyId, ret);
             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
             return ret;
@@ -356,7 +364,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     @GuardedBy("lock")
     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
-            TransactionIdentifier txId);
+            TransactionIdentifier txId, boolean snapshotOnly);
 
     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);