BUG-8372: fix abort message confusion
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index 5257c6c7f6299bcfd85e08f67711256dbe906c63..827c19e526fcbd5f6b3cc1c33dfdc40698af11e9 100644 (file)
@@ -22,7 +22,11 @@ import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -30,7 +34,6 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
@@ -45,9 +48,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private abstract static class AbstractLocal extends ProxyHistory {
         private final DataTree dataTree;
 
-        AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
             this.dataTree = Preconditions.checkNotNull(dataTree);
         }
 
@@ -57,40 +60,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private abstract static class AbstractRemote extends ProxyHistory {
-        AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
-        }
-
-        @Override
-        final AbstractProxyTransaction doCreateTransactionProxy(
-                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
-            return new RemoteProxyTransaction(this, txId);
+            super(parent, connection, identifier);
         }
     }
 
     private static final class Local extends AbstractLocal {
-        private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
-                AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
+        private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
 
         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
         // the open one and attempts to create a new transaction again.
-        private LocalProxyTransaction lastOpen;
+        private LocalReadWriteProxyTransaction lastOpen;
 
-        private volatile LocalProxyTransaction lastSealed;
+        private volatile LocalReadWriteProxyTransaction lastSealed;
 
-        Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
-            final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+        Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId) {
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
 
             // onTransactionCompleted() runs concurrently
-            final LocalProxyTransaction localSealed = lastSealed;
+            final LocalReadWriteProxyTransaction localSealed = lastSealed;
             final DataTreeSnapshot baseSnapshot;
             if (localSealed != null) {
                 baseSnapshot = localSealed.getSnapshot();
@@ -98,29 +95,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 baseSnapshot = takeSnapshot();
             }
 
-            lastOpen = new LocalProxyTransaction(this, txId,
-                (CursorAwareDataTreeModification) baseSnapshot.newModification());
+            if (snapshotOnly) {
+                return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
+            }
+
+            lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
             return lastOpen;
         }
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
 
         @Override
         void onTransactionAborted(final AbstractProxyTransaction tx) {
-            Preconditions.checkState(tx.equals(lastOpen));
-            lastOpen = null;
+            if (tx.equals(lastOpen)) {
+                lastOpen = null;
+            }
         }
 
         @Override
         void onTransactionCompleted(final AbstractProxyTransaction tx) {
             Verify.verify(tx instanceof LocalProxyTransaction);
-
-            if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
-                LOG.debug("Completed last sealed transaction {}", tx);
+            if (tx instanceof LocalReadWriteProxyTransaction) {
+                if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+                    LOG.debug("Completed last sealed transaction {}", tx);
+                }
             }
         }
 
@@ -133,44 +135,58 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private static final class LocalSingle extends AbstractLocal {
-        LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId) {
-            return new LocalProxyTransaction(this, txId,
-                (CursorAwareDataTreeModification) takeSnapshot().newModification());
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            final DataTreeSnapshot snapshot = takeSnapshot();
+            return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
+                new LocalReadWriteProxyTransaction(this, txId, snapshot);
         }
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
     private static final class Remote extends AbstractRemote {
-        Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+        Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+            super(parent, connection, identifier);
+        }
+
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
         }
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
     }
 
     private static final class RemoteSingle extends AbstractRemote {
-        RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
+        }
+
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
         }
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
@@ -195,12 +211,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replaySuccessfulRequests() {
+        void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+            // First look for our Create message
+            for (ConnectionEntry e : previousEntries) {
+                final Request<?, ?> req = e.getRequest();
+                if (identifier.equals(req.getTarget())) {
+                    Verify.verify(req instanceof LocalHistoryRequest);
+                    if (req instanceof CreateLocalHistoryRequest) {
+                        successor.connection.sendRequest(req, e.getCallback());
+                        break;
+                    }
+                }
+            }
+
             for (AbstractProxyTransaction t : proxies.values()) {
                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
-                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
+                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
+                    t.isSnapshotOnly());
                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.replaySuccessfulRequests(newProxy);
+                t.replayMessages(newProxy, previousEntries);
+            }
+
+            // Now look for any finalizing messages
+            for (ConnectionEntry e : previousEntries) {
+                final Request<?, ?> req = e.getRequest();
+                if (identifier.equals(req.getTarget())) {
+                    Verify.verify(req instanceof LocalHistoryRequest);
+                    successor.connection.sendRequest(req, e.getCallback());
+                }
             }
         }
 
@@ -208,24 +246,29 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         ProxyHistory finishReconnect() {
             final ProxyHistory ret = Verify.verifyNotNull(successor);
+
+            for (AbstractProxyTransaction t : proxies.values()) {
+                t.finishReconnect();
+            }
+
             LOG.debug("Finished reconnecting proxy history {}", this);
             lock.unlock();
             return ret;
         }
 
         @Override
-        void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
+        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
             if (request instanceof TransactionRequest) {
-                replayTransactionRequest((TransactionRequest<?>) request, callback);
+                forwardTransactionRequest((TransactionRequest<?>) request, callback);
             } else if (request instanceof LocalHistoryRequest) {
-                replayTo.accept(request, callback);
+                forwardTo.accept(request, callback);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void replayTransactionRequest(final TransactionRequest<?> request,
+        private void forwardTransactionRequest(final TransactionRequest<?> request,
                 final Consumer<Response<?, ?>> callback) throws RequestException {
 
             final AbstractProxyTransaction proxy;
@@ -239,7 +282,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 throw new RequestReplayException("Failed to find proxy for %s", request);
             }
 
-            proxy.replayRequest(request, callback);
+            proxy.forwardRequest(request, callback);
         }
     }
 
@@ -248,30 +291,33 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private final Lock lock = new ReentrantLock();
     private final LocalHistoryIdentifier identifier;
     private final AbstractClientConnection<ShardBackendInfo> connection;
+    private final AbstractClientHistory parent;
 
     @GuardedBy("lock")
     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
     @GuardedBy("lock")
     private ProxyHistory successor;
 
-    private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    private ProxyHistory(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.connection = Preconditions.checkNotNull(connection);
         this.identifier = Preconditions.checkNotNull(identifier);
     }
 
-    static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    static ProxyHistory createClient(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
-             : new Remote(connection, identifier);
+        return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
+             : new Remote(parent, connection, identifier);
     }
 
-    static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+    static ProxyHistory createSingle(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
-             : new RemoteSingle(connection, identifier);
+        return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
+             : new RemoteSingle(parent, connection, identifier);
     }
 
     @Override
@@ -283,15 +329,20 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return connection.localActor();
     }
 
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+    final AbstractClientHistory parent() {
+        return parent;
+    }
+
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+            final boolean snapshotOnly) {
         lock.lock();
         try {
             if (successor != null) {
-                return successor.createTransactionProxy(txId);
+                return successor.createTransactionProxy(txId, snapshotOnly);
             }
 
             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
+            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
             proxies.put(proxyId, ret);
             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
             return ret;
@@ -322,13 +373,29 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void close() {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.close();
+                return;
+            }
+
+            LOG.debug("Proxy {} invoking destroy", this);
+            connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
+                this::onDestroyComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }
 
     @GuardedBy("lock")
     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
-            TransactionIdentifier txId);
+            TransactionIdentifier txId, boolean snapshotOnly);
 
     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
 
@@ -342,9 +409,31 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         successor = createSuccessor(newConnection);
         LOG.debug("History {} instantiated successor {}", this, successor);
+
+        for (AbstractProxyTransaction t : proxies.values()) {
+            t.startReconnect();
+        }
+
         return new ReconnectCohort();
     }
 
+    private void onDestroyComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} destroy completed with {}", this, response);
+
+        lock.lock();
+        try {
+            parent.onProxyDestroyed(this);
+            connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
+                this::onPurgeComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void onPurgeComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} purge completed with {}", this, response);
+    }
+
     @GuardedBy("lock")
     void onTransactionAborted(final AbstractProxyTransaction tx) {
         // No-op for most implementations