BUG-5280: emit DestroyLocalHistory and PurgeLocalHistory requests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index d6aa3d3f3fc079db3ac4ed12e46c40173f1c8dc4..88e86bc08985f00310444854b20a20e129bd1d50 100644 (file)
@@ -24,7 +24,9 @@ import org.opendaylight.controller.cluster.access.client.AbstractClientConnectio
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -46,9 +48,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private abstract static class AbstractLocal extends ProxyHistory {
         private final DataTree dataTree;
 
-        AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
             this.dataTree = Preconditions.checkNotNull(dataTree);
         }
 
@@ -58,9 +60,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private abstract static class AbstractRemote extends ProxyHistory {
-        AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
         }
     }
 
@@ -74,9 +76,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         private volatile LocalReadWriteProxyTransaction lastSealed;
 
-        Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
-            final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+        Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
@@ -104,7 +106,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
 
         @Override
@@ -133,9 +135,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private static final class LocalSingle extends AbstractLocal {
-        LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
@@ -148,13 +150,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
     private static final class Remote extends AbstractRemote {
-        Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+        Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+            super(parent, connection, identifier);
         }
 
         @Override
@@ -165,14 +168,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
     }
 
     private static final class RemoteSingle extends AbstractRemote {
-        RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
         }
 
         @Override
@@ -183,7 +186,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
@@ -288,30 +291,33 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private final Lock lock = new ReentrantLock();
     private final LocalHistoryIdentifier identifier;
     private final AbstractClientConnection<ShardBackendInfo> connection;
+    private final AbstractClientHistory parent;
 
     @GuardedBy("lock")
     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
     @GuardedBy("lock")
     private ProxyHistory successor;
 
-    private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    private ProxyHistory(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.connection = Preconditions.checkNotNull(connection);
         this.identifier = Preconditions.checkNotNull(identifier);
     }
 
-    static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    static ProxyHistory createClient(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
-             : new Remote(connection, identifier);
+        return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
+             : new Remote(parent, connection, identifier);
     }
 
-    static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+    static ProxyHistory createSingle(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
-             : new RemoteSingle(connection, identifier);
+        return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
+             : new RemoteSingle(parent, connection, identifier);
     }
 
     @Override
@@ -323,6 +329,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return connection.localActor();
     }
 
+    final AbstractClientHistory parent() {
+        return parent;
+    }
+
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
         lock.lock();
@@ -363,6 +373,22 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void close() {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.close();
+                return;
+            }
+
+            LOG.debug("Proxy {} invoking destroy", this);
+            connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
+                this::onDestroyComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }
@@ -391,6 +417,23 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return new ReconnectCohort();
     }
 
+    private void onDestroyComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} destroy completed with {}", this, response);
+
+        lock.lock();
+        try {
+            parent.onProxyDestroyed(this);
+            connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
+                this::onPurgeComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void onPurgeComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} purge completed with {}", this, response);
+    }
+
     @GuardedBy("lock")
     void onTransactionAborted(final AbstractProxyTransaction tx) {
         // No-op for most implementations