BUG-8372: fix abort message confusion
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index 846f5c37cfd506eb28f9e01139b0333851492281..827c19e526fcbd5f6b3cc1c33dfdc40698af11e9 100644 (file)
@@ -24,7 +24,9 @@ import org.opendaylight.controller.cluster.access.client.AbstractClientConnectio
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -46,9 +48,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private abstract static class AbstractLocal extends ProxyHistory {
         private final DataTree dataTree;
 
-        AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
             this.dataTree = Preconditions.checkNotNull(dataTree);
         }
 
@@ -58,16 +60,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private abstract static class AbstractRemote extends ProxyHistory {
-        AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
-        }
-
-        @Override
-        final AbstractProxyTransaction doCreateTransactionProxy(
-                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
-                final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly);
+            super(parent, connection, identifier);
         }
     }
 
@@ -81,9 +76,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         private volatile LocalReadWriteProxyTransaction lastSealed;
 
-        Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
-            final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+        Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
@@ -111,7 +106,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
 
         @Override
@@ -140,9 +135,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private static final class LocalSingle extends AbstractLocal {
-        LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
@@ -155,30 +150,43 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
     private static final class Remote extends AbstractRemote {
-        Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+        Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+            super(parent, connection, identifier);
+        }
+
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
         }
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
     }
 
     private static final class RemoteSingle extends AbstractRemote {
-        RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
+        }
+
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
         }
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
@@ -203,7 +211,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+        void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
             // First look for our Create message
             for (ConnectionEntry e : previousEntries) {
                 final Request<?, ?> req = e.getRequest();
@@ -249,18 +257,18 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
+        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
             if (request instanceof TransactionRequest) {
-                replayTransactionRequest((TransactionRequest<?>) request, callback);
+                forwardTransactionRequest((TransactionRequest<?>) request, callback);
             } else if (request instanceof LocalHistoryRequest) {
-                replayTo.accept(request, callback);
+                forwardTo.accept(request, callback);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void replayTransactionRequest(final TransactionRequest<?> request,
+        private void forwardTransactionRequest(final TransactionRequest<?> request,
                 final Consumer<Response<?, ?>> callback) throws RequestException {
 
             final AbstractProxyTransaction proxy;
@@ -274,7 +282,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 throw new RequestReplayException("Failed to find proxy for %s", request);
             }
 
-            proxy.replayRequest(request, callback);
+            proxy.forwardRequest(request, callback);
         }
     }
 
@@ -283,30 +291,33 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private final Lock lock = new ReentrantLock();
     private final LocalHistoryIdentifier identifier;
     private final AbstractClientConnection<ShardBackendInfo> connection;
+    private final AbstractClientHistory parent;
 
     @GuardedBy("lock")
     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
     @GuardedBy("lock")
     private ProxyHistory successor;
 
-    private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    private ProxyHistory(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.connection = Preconditions.checkNotNull(connection);
         this.identifier = Preconditions.checkNotNull(identifier);
     }
 
-    static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    static ProxyHistory createClient(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
-             : new Remote(connection, identifier);
+        return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
+             : new Remote(parent, connection, identifier);
     }
 
-    static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+    static ProxyHistory createSingle(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
-             : new RemoteSingle(connection, identifier);
+        return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
+             : new RemoteSingle(parent, connection, identifier);
     }
 
     @Override
@@ -318,6 +329,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return connection.localActor();
     }
 
+    final AbstractClientHistory parent() {
+        return parent;
+    }
+
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
         lock.lock();
@@ -358,6 +373,22 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void close() {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.close();
+                return;
+            }
+
+            LOG.debug("Proxy {} invoking destroy", this);
+            connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
+                this::onDestroyComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }
@@ -386,6 +417,23 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return new ReconnectCohort();
     }
 
+    private void onDestroyComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} destroy completed with {}", this, response);
+
+        lock.lock();
+        try {
+            parent.onProxyDestroyed(this);
+            connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
+                this::onPurgeComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void onPurgeComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} purge completed with {}", this, response);
+    }
+
     @GuardedBy("lock")
     void onTransactionAborted(final AbstractProxyTransaction tx) {
         // No-op for most implementations