BUG-5280: emit DestroyLocalHistory and PurgeLocalHistory requests 54/51954/3 54/51954/4
authorRobert Varga <rovarga@cisco.com>
Thu, 16 Feb 2017 12:38:34 +0000 (13:38 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 17 Feb 2017 12:24:41 +0000 (12:24 +0000)
In order to properly prune local histories from the backend we need
to issue destroy/purge requests as the history is finalized.

Change-Id: I303937ba7afa594e83c4a944d16886ee98e3036c
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java

index 03d2a0377f83050c031202e64b096e39f006e41a..00f7572f986562c2c3d88d8c8743c6266c34157f 100644 (file)
@@ -83,6 +83,19 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
     }
 
+    final synchronized void doClose() {
+        final State local = state;
+        if (local != State.CLOSED) {
+            Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
+            histories.values().forEach(ProxyHistory::close);
+        }
+    }
+
+    final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
+        histories.remove(proxyHistory.getIdentifier().getCookie());
+        LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
+    }
+
     @Override
     public final LocalHistoryIdentifier getIdentifier() {
         return identifier;
@@ -307,4 +320,5 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
             }
         };
     }
+
 }
index 26b03e39b092c57d00876b030df15d1aeda015e8..4598d95ebb9f7cc0482ccb75a04b73c053c41e04 100644 (file)
@@ -32,11 +32,7 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
 
     @Override
     public void close() {
-        final State local = state();
-        if (local != State.CLOSED) {
-            Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
-            updateState(local, State.CLOSED);
-        }
+        doClose();
     }
 
     private State ensureIdleState() {
@@ -92,6 +88,6 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
     @Override
     ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
             final AbstractClientConnection<ShardBackendInfo> connection) {
-        return ProxyHistory.createClient(connection, historyId);
+        return ProxyHistory.createClient(this, connection, historyId);
     }
 }
index d6aa3d3f3fc079db3ac4ed12e46c40173f1c8dc4..88e86bc08985f00310444854b20a20e129bd1d50 100644 (file)
@@ -24,7 +24,9 @@ import org.opendaylight.controller.cluster.access.client.AbstractClientConnectio
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -46,9 +48,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private abstract static class AbstractLocal extends ProxyHistory {
         private final DataTree dataTree;
 
-        AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
             this.dataTree = Preconditions.checkNotNull(dataTree);
         }
 
@@ -58,9 +60,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private abstract static class AbstractRemote extends ProxyHistory {
-        AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
+        AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
         }
     }
 
@@ -74,9 +76,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         private volatile LocalReadWriteProxyTransaction lastSealed;
 
-        Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
-            final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+        Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
@@ -104,7 +106,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
 
         @Override
@@ -133,9 +135,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     private static final class LocalSingle extends AbstractLocal {
-        LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
-            super(connection, identifier, dataTree);
+            super(parent, connection, identifier, dataTree);
         }
 
         @Override
@@ -148,13 +150,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
     private static final class Remote extends AbstractRemote {
-        Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+        Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+            final LocalHistoryIdentifier identifier) {
+            super(parent, connection, identifier);
         }
 
         @Override
@@ -165,14 +168,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createClient(connection, getIdentifier());
+            return createClient(parent(), connection, getIdentifier());
         }
     }
 
     private static final class RemoteSingle extends AbstractRemote {
-        RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+        RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-            super(connection, identifier);
+            super(parent, connection, identifier);
         }
 
         @Override
@@ -183,7 +186,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
-            return createSingle(connection, getIdentifier());
+            return createSingle(parent(), connection, getIdentifier());
         }
     }
 
@@ -288,30 +291,33 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private final Lock lock = new ReentrantLock();
     private final LocalHistoryIdentifier identifier;
     private final AbstractClientConnection<ShardBackendInfo> connection;
+    private final AbstractClientHistory parent;
 
     @GuardedBy("lock")
     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
     @GuardedBy("lock")
     private ProxyHistory successor;
 
-    private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    private ProxyHistory(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.connection = Preconditions.checkNotNull(connection);
         this.identifier = Preconditions.checkNotNull(identifier);
     }
 
-    static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier) {
+    static ProxyHistory createClient(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
-             : new Remote(connection, identifier);
+        return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
+             : new Remote(parent, connection, identifier);
     }
 
-    static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+    static ProxyHistory createSingle(final AbstractClientHistory parent,
+            final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
-             : new RemoteSingle(connection, identifier);
+        return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
+             : new RemoteSingle(parent, connection, identifier);
     }
 
     @Override
@@ -323,6 +329,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return connection.localActor();
     }
 
+    final AbstractClientHistory parent() {
+        return parent;
+    }
+
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
         lock.lock();
@@ -363,6 +373,22 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void close() {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.close();
+                return;
+            }
+
+            LOG.debug("Proxy {} invoking destroy", this);
+            connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
+                this::onDestroyComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }
@@ -391,6 +417,23 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return new ReconnectCohort();
     }
 
+    private void onDestroyComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} destroy completed with {}", this, response);
+
+        lock.lock();
+        try {
+            parent.onProxyDestroyed(this);
+            connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
+                this::onPurgeComplete);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void onPurgeComplete(final Response<?, ?> response) {
+        LOG.debug("Proxy {} purge completed with {}", this, response);
+    }
+
     @GuardedBy("lock")
     void onTransactionAborted(final AbstractProxyTransaction tx) {
         // No-op for most implementations
index 8220bfaaef6ab7e9bf21e84885c0d5ea507c97b0..7c3bba9756043c0b4bad67ede6f272cb1c813111 100644 (file)
@@ -44,6 +44,6 @@ final class SingleClientHistory extends AbstractClientHistory {
     @Override
     ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
             final AbstractClientConnection<ShardBackendInfo> connection) {
-        return ProxyHistory.createSingle(connection, historyId);
+        return ProxyHistory.createSingle(this, connection, historyId);
     }
 }