Properly synchronize NetconfDeviceTopologyAdapter 96/107696/5
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 4 Sep 2023 20:53:33 +0000 (22:53 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 5 Sep 2023 13:01:08 +0000 (15:01 +0200)
The adapter is using TransactionChain, which should always be guarded.
Make sure we guard that even when callers fail to provide their own
guards.

There is a further bug, where a failed last transaction would not
complete the closeFuture(), leaving callers stack.

Finally we disconnect from AutoCloseable and provide a shutdown()
method, which allows users to decide on how to synchronize the datastore
access. This allows us to concurrently shutdown the mount point and
datastore state and wait for the completion of the latter only after
both actions have been initiated.

JIRA: NETCONF-1146
Change-Id: Ieeb6ca485bed3553224f6845d04ae07d4c5d5176
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
apps/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
apps/netconf-topology/src/main/java/org/opendaylight/netconf/topology/spi/NetconfDeviceTopologyAdapter.java
apps/netconf-topology/src/main/java/org/opendaylight/netconf/topology/spi/NetconfTopologyDeviceSalFacade.java
apps/netconf-topology/src/test/java/org/opendaylight/netconf/topology/spi/NetconfDeviceTopologyAdapterTest.java

index 57546b3e692c2601d19145c27a847e9f049a65a5..c942f989286327da0016e5306e8765d8ddf8f2f9 100644 (file)
@@ -16,6 +16,7 @@ import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
@@ -122,8 +123,17 @@ class MasterSalFacade implements RemoteDeviceHandler, AutoCloseable {
 
     @Override
     public void close() {
-        datastoreAdapter.close();
+        final var future = datastoreAdapter.shutdown();
         mount.close();
+
+        try {
+            future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException("Interrupted while waiting for datastore adapter shutdown", e);
+        } catch (ExecutionException e) {
+            throw new IllegalStateException("Datastore adapter shutdown failed", e);
+        }
     }
 
     private void registerMasterMountPoint() {
index e007f952729de90ef253c82d9e81b29aceb7fc42..3da668f8f1d5dbb7dc3005248b10a27f4a641569 100644 (file)
@@ -11,9 +11,10 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ExecutionException;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.Transaction;
@@ -47,14 +48,16 @@ import org.opendaylight.yangtools.yang.common.Uint16;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class NetconfDeviceTopologyAdapter implements TransactionChainListener, AutoCloseable {
+public final class NetconfDeviceTopologyAdapter implements TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTopologyAdapter.class);
 
-    private final SettableFuture<Empty> closeFuture = SettableFuture.create();
     private final @NonNull KeyedInstanceIdentifier<Topology, TopologyKey> topologyPath;
     private final DataBroker dataBroker;
     private final RemoteDeviceId id;
 
+    @GuardedBy("this")
+    private SettableFuture<Empty> closeFuture;
+    @GuardedBy("this")
     private TransactionChain txChain;
 
     public NetconfDeviceTopologyAdapter(final DataBroker dataBroker,
@@ -64,97 +67,86 @@ public final class NetconfDeviceTopologyAdapter implements TransactionChainListe
         this.id = requireNonNull(id);
         txChain = dataBroker.createMergingTransactionChain(this);
 
-        final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        final var tx = txChain.newWriteOnlyTransaction();
         LOG.trace("{}: Init device state transaction {} putting if absent operational data started.", id,
-            writeTx.getIdentifier());
+            tx.getIdentifier());
         final var nodePath = nodePath();
-        writeTx.put(LogicalDatastoreType.OPERATIONAL, nodePath, new NodeBuilder()
+        tx.put(LogicalDatastoreType.OPERATIONAL, nodePath, new NodeBuilder()
             .withKey(nodePath.getKey())
             .addAugmentation(new NetconfNodeBuilder()
                 .setConnectionStatus(ConnectionStatus.Connecting)
                 .setHost(id.host())
                 .setPort(new PortNumber(Uint16.valueOf(id.address().getPort()))).build())
             .build());
-        LOG.trace("{}: Init device state transaction {} putting operational data ended.", id, writeTx.getIdentifier());
-
-        commitTransaction(writeTx, "init");
+        LOG.trace("{}: Init device state transaction {} putting operational data ended.", id, tx.getIdentifier());
+        commitTransaction(tx, "init");
     }
 
-    private @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodePath() {
-        return topologyPath.child(Node.class, new NodeKey(new NodeId(id.name())));
-    }
-
-    private @NonNull InstanceIdentifier<NetconfNode> netconfNodePath() {
-        return nodePath().augmentation(NetconfNode.class);
-    }
-
-    public void updateDeviceData(final boolean up, final NetconfDeviceCapabilities capabilities,
+    public synchronized void updateDeviceData(final boolean up, final NetconfDeviceCapabilities capabilities,
             final SessionIdType sessionId) {
-        final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
-        LOG.trace("{}: Update device state transaction {} merging operational data started.",
-                id, writeTx.getIdentifier());
+        final var tx = txChain.newWriteOnlyTransaction();
+        LOG.trace("{}: Update device state transaction {} merging operational data started.", id, tx.getIdentifier());
 
         // FIXME: this needs to be tied together with node's operational existence
-        writeTx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
+        tx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
             newNetconfNodeBuilder(up, capabilities, sessionId).build());
-        LOG.trace("{}: Update device state transaction {} merging operational data ended.",
-                id, writeTx.getIdentifier());
-
-        commitTransaction(writeTx, "update");
+        LOG.trace("{}: Update device state transaction {} merging operational data ended.", id, tx.getIdentifier());
+        commitTransaction(tx, "update");
     }
 
-    public void updateClusteredDeviceData(final boolean up, final String masterAddress,
+    public synchronized void updateClusteredDeviceData(final boolean up, final String masterAddress,
             final NetconfDeviceCapabilities capabilities, final SessionIdType sessionId) {
-        final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
-        LOG.trace("{}: Update device state transaction {} merging operational data started.",
-                id, writeTx.getIdentifier());
-        writeTx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
+        final var tx = txChain.newWriteOnlyTransaction();
+        LOG.trace("{}: Update device state transaction {} merging operational data started.", id, tx.getIdentifier());
+        tx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(),
             newNetconfNodeBuilder(up, capabilities, sessionId)
                 .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
                     .setNetconfMasterNode(masterAddress)
                     .build())
                 .build());
-        LOG.trace("{}: Update device state transaction {} merging operational data ended.",
-                id, writeTx.getIdentifier());
+        LOG.trace("{}: Update device state transaction {} merging operational data ended.", id, tx.getIdentifier());
 
-        commitTransaction(writeTx, "update");
+        commitTransaction(tx, "update");
     }
 
-    @Override
-    public void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
-            final Throwable cause) {
-        LOG.warn("{}: TransactionChain({}) {} FAILED!", id, chain, transaction.getIdentifier(), cause);
-        chain.close();
-
-        txChain = dataBroker.createMergingTransactionChain(this);
-        LOG.info("{}: TransactionChain reset to {}", id, txChain);
-        // FIXME: restart last update
+    public synchronized void setDeviceAsFailed(final Throwable throwable) {
+        final var data = new NetconfNodeBuilder()
+                .setHost(id.host())
+                .setPort(new PortNumber(Uint16.valueOf(id.address().getPort())))
+                .setConnectionStatus(ConnectionStatus.UnableToConnect)
+                .setConnectedMessage(extractReason(throwable))
+                .build();
+
+        final var tx = txChain.newWriteOnlyTransaction();
+        LOG.trace("{}: Setting device state as failed {} putting operational data started.", id, tx.getIdentifier());
+        tx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(), data);
+        LOG.trace("{}: Setting device state as failed {} putting operational data ended.", id, tx.getIdentifier());
+        commitTransaction(tx, "update-failed-device");
     }
 
-    @Override
-    public void onTransactionChainSuccessful(final TransactionChain chain) {
-        LOG.trace("{}: TransactionChain({}) SUCCESSFUL", id, chain);
-        closeFuture.set(Empty.value());
+    public synchronized ListenableFuture<Empty> shutdown() {
+        if (closeFuture != null) {
+            return closeFuture;
+        }
+
+        closeFuture = SettableFuture.create();
+
+        final var tx = txChain.newWriteOnlyTransaction();
+        LOG.trace("{}: Close device state transaction {} removing all data started.", id, tx.getIdentifier());
+        tx.delete(LogicalDatastoreType.OPERATIONAL, nodePath());
+        LOG.trace("{}: Close device state transaction {} removing all data ended.", id, tx.getIdentifier());
+        commitTransaction(tx, "close");
+
+        txChain.close();
+        return closeFuture;
     }
 
-    public void setDeviceAsFailed(final Throwable throwable) {
-        String reason = throwable != null && throwable.getMessage() != null ? throwable.getMessage() : "Unknown reason";
+    private @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodePath() {
+        return topologyPath.child(Node.class, new NodeKey(new NodeId(id.name())));
+    }
 
-        final NetconfNode data = new NetconfNodeBuilder()
-                .setHost(id.host())
-                .setPort(new PortNumber(Uint16.valueOf(id.address().getPort())))
-                .setConnectionStatus(ConnectionStatus.UnableToConnect).setConnectedMessage(reason).build();
-
-        final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
-        LOG.trace(
-                "{}: Setting device state as failed {} putting operational data started.",
-                id, writeTx.getIdentifier());
-        writeTx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, netconfNodePath(), data);
-        LOG.trace(
-                "{}: Setting device state as failed {} putting operational data ended.",
-                id, writeTx.getIdentifier());
-
-        commitTransaction(writeTx, "update-failed-device");
+    private @NonNull InstanceIdentifier<NetconfNode> netconfNodePath() {
+        return nodePath().augmentation(NetconfNode.class);
     }
 
     private NetconfNodeBuilder newNetconfNodeBuilder(final boolean up, final NetconfDeviceCapabilities capabilities,
@@ -198,20 +190,35 @@ public final class NetconfDeviceTopologyAdapter implements TransactionChainListe
     }
 
     @Override
-    public void close() {
-        final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
-        LOG.trace("{}: Close device state transaction {} removing all data started.", id, writeTx.getIdentifier());
-        writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodePath());
-        LOG.trace("{}: Close device state transaction {} removing all data ended.", id, writeTx.getIdentifier());
-        commitTransaction(writeTx, "close");
+    public synchronized void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
+            final Throwable cause) {
+        LOG.warn("{}: TransactionChain({}) {} FAILED!", id, chain, transaction.getIdentifier(), cause);
+        if (closeFuture != null) {
+            closeFuture.setException(cause);
+            return;
+        }
 
-        txChain.close();
+        // FIXME: move this up once we have MDSAL-838 fixed
+        chain.close();
+
+        txChain = dataBroker.createMergingTransactionChain(this);
+        LOG.info("{}: TransactionChain reset to {}", id, txChain);
+        // FIXME: restart last update
+    }
 
-        try {
-            closeFuture.get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("{}: Transaction(close) {} FAILED!", id, writeTx.getIdentifier(), e);
-            throw new IllegalStateException(id + "  Transaction(close) not committed correctly", e);
+    @Override
+    public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
+        LOG.trace("{}: TransactionChain({}) SUCCESSFUL", id, chain);
+        closeFuture.set(Empty.value());
+    }
+
+    private static @NonNull String extractReason(final Throwable throwable) {
+        if (throwable != null) {
+            final var message = throwable.getMessage();
+            if (message != null) {
+                return message;
+            }
         }
+        return "Unknown reason";
     }
 }
index 5cbd4292456a3ab7053b66dd3f93dee8a8737299..2e586b73b913025922fb6cc7a862f5e8cbc0b8e3 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.netconf.topology.spi;
 
+import java.util.concurrent.ExecutionException;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCapabilities;
@@ -48,8 +49,17 @@ public class NetconfTopologyDeviceSalFacade extends NetconfDeviceSalFacade {
     }
 
     @Override
-    public void close() {
-        datastoreAdapter.close();
+    public synchronized void close() {
+        final var future = datastoreAdapter.shutdown();
         super.close();
+
+        try {
+            future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException("Interrupted while waiting for datastore adapter shutdown", e);
+        } catch (ExecutionException e) {
+            throw new IllegalStateException("Datastore adapter shutdown failed", e);
+        }
     }
 }
index db38ad754d0f6f6971a2444e3b79b3c6bcd07c93..488ef954df4354dfa98f31d5afb7797706ad7fc8 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.netconf.topology.spi;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
@@ -14,7 +17,9 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.util.concurrent.Futures;
 import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,6 +44,7 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.common.Uint32;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
@@ -110,12 +116,47 @@ public class NetconfDeviceTopologyAdapterTest {
         // FIXME: exact match
         doNothing().when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class));
         doNothing().when(mockChain).close();
+
+        final var future = adapter.shutdown();
+        verify(mockChain, times(2)).newWriteOnlyTransaction();
+        verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL,
+            TEST_TOPOLOGY_ID.child(Node.class, new NodeKey(new NodeId(id.name()))));
+        verify(mockTx, times(2)).commit();
+        verify(mockChain).close();
+
+        assertFalse(future.isDone());
+
+        // Idempotent
+        assertSame(future, adapter.shutdown());
+
+        // future completes
         listeners.getValue().onTransactionChainSuccessful(mockChain);
-        adapter.close();
+        assertSame(Empty.value(), Futures.getDone(future));
+    }
 
+
+    @Test
+    public void testShutdownCompletion() throws Exception {
+        // FIXME: exact match
+        doNothing().when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class));
+        doNothing().when(mockChain).close();
+
+        final var future = adapter.shutdown();
         verify(mockChain, times(2)).newWriteOnlyTransaction();
         verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL,
             TEST_TOPOLOGY_ID.child(Node.class, new NodeKey(new NodeId(id.name()))));
         verify(mockTx, times(2)).commit();
+        verify(mockChain).close();
+
+        assertFalse(future.isDone());
+
+        // Idempotent
+        assertSame(future, adapter.shutdown());
+
+        // future completes
+        final var cause = new Throwable();
+        listeners.getValue().onTransactionChainFailed(mockChain, mockTx, cause);
+        final var ex = assertThrows(ExecutionException.class, () -> Futures.getDone(future));
+        assertSame(cause, ex.getCause());
     }
 }