Use DTCL to maintain LSPDB versions 22/98622/6
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 19 Nov 2021 18:56:40 +0000 (19:56 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 22 Nov 2021 12:54:59 +0000 (13:54 +0100)
Rather than issuing synchronous writes, use a DTCL to acquire
versions populated in the datastore. This eliminates a failure code
path and makes connections a tad faster.

JIRA: BGPCEP-989
Change-Id: I90c45dfac9a610f1d153eec1d95fbb50bad86800
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPStatefulPeerProposal.java
pcep/topology/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPStatefulPeerProposalTest.java

index 8c1a72f7f2a9fb7e2a64507aa1622d755dafa33e..bd0e1c2a7697f939fb4132c16f75fe353af9b58f 100644 (file)
@@ -8,20 +8,17 @@
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import static com.google.common.base.Verify.verifyNotNull;
-import static java.util.Objects.requireNonNull;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.annotations.VisibleForTesting;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
 import org.opendaylight.mdsal.binding.api.DataTreeModification;
-import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.protocol.pcep.PCEPPeerProposal;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.PathComputationClient1;
@@ -39,51 +36,83 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.concepts.AbstractRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 final class PCEPStatefulPeerProposal extends AbstractRegistration implements PCEPPeerProposal {
-    private static final class SpeakerIdListener implements ClusteredDataTreeChangeListener<PcepNodeSyncConfig> {
-        final Map<NodeId, byte[]> map = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    private abstract static class AbstractListener<D extends DataObject, V>
+            implements ClusteredDataTreeChangeListener<D> {
+        final Map<NodeId, V> map = new ConcurrentHashMap<>();
         final Registration reg;
 
+        AbstractListener(final DataBroker dataBroker, final @NonNull LogicalDatastoreType datastore,
+                final @NonNull InstanceIdentifier<D> wildcard) {
+            reg = dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(datastore, wildcard), this);
+        }
+
+        final void remove(final DataTreeModification<?> modification) {
+            map.remove(extractNodeId(modification));
+        }
+
+        final void update(final DataTreeModification<?> modification, final V value) {
+            final var nodeId = extractNodeId(modification);
+            if (value != null) {
+                map.put(nodeId, value);
+            } else {
+                map.remove(nodeId);
+            }
+        }
+
+        private static @NonNull NodeId extractNodeId(final DataTreeModification<?> modification) {
+            return verifyNotNull(modification.getRootPath().getRootIdentifier().firstKeyOf(Node.class)).getNodeId();
+        }
+    }
+
+    @VisibleForTesting
+    static final class SpeakerIdListener extends AbstractListener<PcepNodeSyncConfig, byte[]> {
         SpeakerIdListener(final DataBroker dataBroker, final InstanceIdentifier<Topology> topologyId) {
-            reg = dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(
-                LogicalDatastoreType.CONFIGURATION, topologyId.child(Node.class).augmentation(PcepNodeConfig.class)
-                    .child(SessionConfig.class).augmentation(PcepNodeSyncConfig.class)), this);
+            super(dataBroker, LogicalDatastoreType.CONFIGURATION, topologyId.child(Node.class)
+                .augmentation(PcepNodeConfig.class).child(SessionConfig.class)
+                .augmentation(PcepNodeSyncConfig.class));
         }
 
         @Override
         public void onDataTreeChanged(final Collection<DataTreeModification<PcepNodeSyncConfig>> changes) {
             for (var change : changes) {
-                final var nodeId = verifyNotNull(change.getRootPath().getRootIdentifier().firstKeyOf(Node.class))
-                    .getNodeId();
                 final var config = change.getRootNode().getDataAfter();
                 if (config != null) {
-                    final var speakerEntityId = config.getSpeakerEntityIdValue();
-                    if (speakerEntityId != null) {
-                        map.put(nodeId, speakerEntityId);
-                        continue;
-                    }
+                    update(change, config.getSpeakerEntityIdValue());
+                } else {
+                    remove(change);
                 }
-                map.remove(nodeId);
             }
         }
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(PCEPStatefulPeerProposal.class);
+    @VisibleForTesting
+    static final class LspDbVersionListener extends AbstractListener<LspDbVersion, LspDbVersion> {
+        LspDbVersionListener(final DataBroker dataBroker, final InstanceIdentifier<Topology> topologyId) {
+            super(dataBroker, LogicalDatastoreType.OPERATIONAL, topologyId.child(Node.class)
+                .augmentation(Node1.class).child(PathComputationClient.class)
+                .augmentation(PathComputationClient1.class).child(LspDbVersion.class));
+        }
 
-    private final InstanceIdentifier<Topology> topologyId;
+        @Override
+        public void onDataTreeChanged(final Collection<DataTreeModification<LspDbVersion>> changes) {
+            for (var change : changes) {
+                update(change, change.getRootNode().getDataAfter());
+            }
+        }
+    }
+
+    private final LspDbVersionListener lspDbVersions;
     private final SpeakerIdListener speakerIds;
-    private final DataBroker dataBroker;
 
     PCEPStatefulPeerProposal(final DataBroker dataBroker, final InstanceIdentifier<Topology> topologyId) {
-        this.dataBroker = requireNonNull(dataBroker);
-        this.topologyId = requireNonNull(topologyId);
+        lspDbVersions = new LspDbVersionListener(dataBroker, topologyId);
         speakerIds = new SpeakerIdListener(dataBroker, topologyId);
     }
 
@@ -99,31 +128,17 @@ final class PCEPStatefulPeerProposal extends AbstractRegistration implements PCE
             return;
         }
 
-        final var addr = address.getAddress();
-        final var nodeId = ServerSessionManager.createNodeId(addr);
-        // FIXME: BGPCEP-989: acquire this information via a DTCL and perform a simple lookup only
-        Optional<LspDbVersion> result = Optional.empty();
-        try (ReadTransaction rTx = dataBroker.newReadOnlyTransaction()) {
-            // FIXME: we should be listening for this configuration and keep a proper cache
-            final ListenableFuture<Optional<LspDbVersion>> future = rTx.read(LogicalDatastoreType.OPERATIONAL,
-                topologyId.child(Node.class, new NodeKey(nodeId))
-                    .augmentation(Node1.class).child(PathComputationClient.class)
-                    .augmentation(PathComputationClient1.class).child(LspDbVersion.class));
-            try {
-                result = future.get();
-            } catch (final InterruptedException | ExecutionException e) {
-                LOG.warn("Failed to read toplogy {}.", InstanceIdentifier.keyOf(topologyId), e);
-            }
-        }
-
+        final var nodeId = ServerSessionManager.createNodeId(address.getAddress());
+        final var dbVersion = lspDbVersions.map.get(nodeId);
         final var speakerId = speakerIds.map.get(nodeId);
-        if (speakerId == null && !result.isPresent()) {
+        if (speakerId == null && dbVersion == null) {
+            // Nothing to add
             return;
         }
 
         final Tlvs3Builder syncBuilder = new Tlvs3Builder();
-        if (result.isPresent()) {
-            syncBuilder.setLspDbVersion(result.get());
+        if (dbVersion != null) {
+            syncBuilder.setLspDbVersion(dbVersion);
         }
         if (speakerId != null) {
             syncBuilder.setSpeakerEntityId(new SpeakerEntityIdBuilder().setSpeakerEntityIdValue(speakerId).build());
@@ -133,6 +148,7 @@ final class PCEPStatefulPeerProposal extends AbstractRegistration implements PCE
 
     @Override
     protected void removeRegistration() {
+        lspDbVersions.reg.close();
         speakerIds.reg.close();
     }
 }
index 2d89392f56449f9febc0f3a9114b1e2d6f9b0abd..8697e9cf9a33f958452fb75affc36d7b9fdab9e0 100644 (file)
@@ -13,7 +13,6 @@ import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 import com.google.common.util.concurrent.FluentFuture;
@@ -26,15 +25,17 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.bgpcep.pcep.topology.provider.PCEPStatefulPeerProposal.LspDbVersionListener;
+import org.opendaylight.bgpcep.pcep.topology.provider.PCEPStatefulPeerProposal.SpeakerIdListener;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.DataObjectModification;
 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
 import org.opendaylight.mdsal.binding.api.DataTreeModification;
-import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.Stateful1Builder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.Tlvs3;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.Tlvs3Builder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.lsp.db.version.tlv.LspDbVersion;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.lsp.db.version.tlv.LspDbVersionBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.speaker.entity.id.tlv.SpeakerEntityIdBuilder;
@@ -68,8 +69,6 @@ public class PCEPStatefulPeerProposalTest {
     private ListenerRegistration<?> listenerReg;
     @Mock
     private FluentFuture<Optional<LspDbVersion>> listenableFutureMock;
-    @Mock
-    private ReadTransaction rt;
 
     private ArgumentCaptor<DataTreeChangeListener<?>> captor;
     private TlvsBuilder tlvsBuilder;
@@ -82,35 +81,66 @@ public class PCEPStatefulPeerProposalTest {
         captor = ArgumentCaptor.forClass(DataTreeChangeListener.class);
         doReturn(listenerReg).when(dataBroker).registerDataTreeChangeListener(any(), captor.capture());
         doNothing().when(listenerReg).close();
-        doReturn(rt).when(dataBroker).newReadOnlyTransaction();
-        doNothing().when(rt).close();
-        doReturn(listenableFutureMock).when(rt).read(any(), any());
     }
 
     @Test
     public void testSetPeerProposalSuccess() throws Exception {
-        doReturn(Optional.of(LSP_DB_VERSION)).when(listenableFutureMock).get();
-        updateBuilder();
-        assertEquals(LSP_DB_VERSION, tlvsBuilder.augmentation(Tlvs3.class).getLspDbVersion());
+        updateBuilder(() -> {
+            final var listeners = captor.getAllValues();
+            assertEquals(2, listeners.size());
+
+            // not entirely accurate, but works well enough
+            final var modPath = TOPOLOGY_IID.child(Node.class,
+                new NodeKey(ServerSessionManager.createNodeId(ADDRESS.getAddress())));
+
+            final var dbverRoot = mock(DataObjectModification.class);
+            doReturn(LSP_DB_VERSION).when(dbverRoot).getDataAfter();
+            final var dbverMod = mock(DataTreeModification.class);
+            doReturn(DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, modPath)).when(dbverMod).getRootPath();
+            doReturn(dbverRoot).when(dbverMod).getRootNode();
+
+            for (DataTreeChangeListener<?> listener : listeners) {
+                if (listener instanceof LspDbVersionListener) {
+                    listener.onDataTreeChanged(List.of(dbverMod));
+                }
+            }
+
+            // Mock lspdb
+        });
+        assertEquals(new Tlvs3Builder().setLspDbVersion(LSP_DB_VERSION).build(), tlvsBuilder.augmentation(Tlvs3.class));
     }
 
     @Test
     public void testSetPeerProposalWithEntityIdSuccess() throws Exception {
-        doReturn(Optional.of(LSP_DB_VERSION)).when(listenableFutureMock).get();
-
         updateBuilder(() -> {
-            final var modification = mock(DataTreeModification.class);
-            doReturn(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
-                // not entirely accurate, but works well enough
-                TOPOLOGY_IID.child(Node.class, new NodeKey(ServerSessionManager.createNodeId(ADDRESS.getAddress())))))
-                .when(modification).getRootPath();
-
-            final var modificationRoot = mock(DataObjectModification.class);
-            doReturn(new PcepNodeSyncConfigBuilder().setSpeakerEntityIdValue(SPEAKER_ID).build()).when(modificationRoot)
+            final var listeners = captor.getAllValues();
+            assertEquals(2, listeners.size());
+
+            // not entirely accurate, but works well enough
+            final var modPath = TOPOLOGY_IID.child(Node.class,
+                new NodeKey(ServerSessionManager.createNodeId(ADDRESS.getAddress())));
+
+            final var dbverRoot = mock(DataObjectModification.class);
+            doReturn(LSP_DB_VERSION).when(dbverRoot).getDataAfter();
+            final var dbverMod = mock(DataTreeModification.class);
+            doReturn(DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, modPath)).when(dbverMod).getRootPath();
+            doReturn(dbverRoot).when(dbverMod).getRootNode();
+
+            final var speakerRoot = mock(DataObjectModification.class);
+            doReturn(new PcepNodeSyncConfigBuilder().setSpeakerEntityIdValue(SPEAKER_ID).build()).when(speakerRoot)
                 .getDataAfter();
-            doReturn(modificationRoot).when(modification).getRootNode();
+            final var speakerMod = mock(DataTreeModification.class);
+            doReturn(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, modPath)).when(speakerMod)
+                .getRootPath();
+            doReturn(speakerRoot).when(speakerMod).getRootNode();
 
-            captor.getValue().onDataTreeChanged(List.of(modification));
+            for (DataTreeChangeListener<?> listener : listeners) {
+                if (listener instanceof SpeakerIdListener) {
+                    listener.onDataTreeChanged(List.of(speakerMod));
+                } else if (listener instanceof LspDbVersionListener) {
+                    listener.onDataTreeChanged(List.of(dbverMod));
+                }
+            }
         });
         final Tlvs3 aug = tlvsBuilder.augmentation(Tlvs3.class);
         assertNotNull(aug);
@@ -121,14 +151,6 @@ public class PCEPStatefulPeerProposalTest {
 
     @Test
     public void testSetPeerProposalAbsent() throws Exception {
-        doReturn(Optional.empty()).when(listenableFutureMock).get();
-        updateBuilder();
-        assertNull(tlvsBuilder.augmentation(Tlvs3.class));
-    }
-
-    @Test
-    public void testSetPeerProposalFailure() throws Exception {
-        doThrow(new InterruptedException()).when(listenableFutureMock).get();
         updateBuilder();
         assertNull(tlvsBuilder.augmentation(Tlvs3.class));
     }