BGPCEP-709: Operational OpenConfig BGP RIB output is not seen for all controllers... 72/64772/9
authorAjay Lele <ajayslele@gmail.com>
Thu, 26 Oct 2017 19:04:59 +0000 (12:04 -0700)
committerClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Tue, 14 Nov 2017 20:21:26 +0000 (21:21 +0100)
- StateProviderImpl uses clustering singleton service to run only 1 instance in cluster.
  Problem is that if BGP speaker needs to be kept separate across the cluster nodes
  e.g. for scalability issues (creating separate shard for bgp rib and openconfig module and
  disabling replication for them), since the singleton service name is same across
  the nodes, only 1 instance of the service runs and so operational data is available
  only on that node.
- This PR changes this to run operational data collection/update service on all nodes,
  but perform the activity only for rib and peer that are active on that node.
- Updated unit-tests.

Change-Id: I7284b249b8ec68b4c6905a92bbb221816d196cae
Signed-off-by: Ajay Lele <ajayslele@gmail.com>
bgp/openconfig-state/src/main/java/org/opendaylight/protocol/bgp/state/StateProviderImpl.java
bgp/openconfig-state/src/main/resources/org/opendaylight/blueprint/bgp-openconfig-state.xml
bgp/openconfig-state/src/test/java/org/opendaylight/protocol/bgp/state/StateProviderImplTest.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/state/BGPPeerStateImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/state/BGPRIBStateImpl.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/state/BGPPeerState.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/state/BGPRIBState.java

index c90a2283f68a94977c8357efeed37d3181f0930c..7d7c9d021eb98dc179a19b4432f67d0aa6bba1a9 100644 (file)
@@ -10,8 +10,6 @@ package org.opendaylight.protocol.bgp.state;
 
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.ScheduledFuture;
 import java.util.HashMap;
@@ -32,11 +30,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
-import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
 import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
 import org.opendaylight.protocol.bgp.rib.spi.state.BGPRIBState;
@@ -62,10 +55,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @ThreadSafe
-public final class StateProviderImpl implements TransactionChainListener, ClusterSingletonService, AutoCloseable {
+public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
-    private static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier
-        .create("bgp-state-provider-service-group");
     private final BGPStateConsumer stateCollector;
     private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
     private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
@@ -73,27 +64,23 @@ public final class StateProviderImpl implements TransactionChainListener, Cluste
     private final DataBroker dataBroker;
     @GuardedBy("this")
     private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
-    private ClusterSingletonServiceRegistration singletonServiceRegistration;
     @GuardedBy("this")
     private BindingTransactionChain transactionChain;
     @GuardedBy("this")
     private ScheduledFuture<?> scheduleTask;
 
     public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
-        @Nonnull BGPTableTypeRegistryConsumer bgpTableTypeRegistry, @Nonnull final BGPStateConsumer stateCollector,
-        @Nonnull final String networkInstanceName, @Nonnull final ClusterSingletonServiceProvider provider) {
+        @Nonnull final BGPTableTypeRegistryConsumer bgpTableTypeRegistry, @Nonnull final BGPStateConsumer stateCollector,
+        @Nonnull final String networkInstanceName) {
         this.dataBroker = requireNonNull(dataBroker);
         this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
         this.stateCollector = requireNonNull(stateCollector);
         this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
             .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
         this.timeout = timeout;
-        this.singletonServiceRegistration = requireNonNull(provider)
-            .registerClusterSingletonService(this);
     }
 
-    @Override
-    public synchronized void instantiateServiceInstance() {
+    public synchronized void init() {
         this.transactionChain = this.dataBroker.createTransactionChain(this);
         final TimerTask task = new TimerTask() {
             @Override
@@ -117,14 +104,14 @@ public final class StateProviderImpl implements TransactionChainListener, Cluste
 
     private synchronized void updateBGPStats(final WriteTransaction wTx) {
         final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
-        this.stateCollector.getRibStats()
-            .forEach(bgpStateConsumer -> {
-                final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
-                final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
-                    .filter(peerState -> ribId.equals(peerState.getInstanceIdentifier())).collect(Collectors.toList());
-                storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wTx);
-                oldStats.remove(ribId.getKey().getId().getValue());
-            });
+        this.stateCollector.getRibStats().stream().filter(BGPRIBState::isActive).forEach(bgpStateConsumer -> {
+            final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
+            final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
+                    .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
+                    .collect(Collectors.toList());
+            storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wTx);
+            oldStats.remove(ribId.getKey().getId().getValue());
+        });
         oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wTx));
     }
 
@@ -153,11 +140,15 @@ public final class StateProviderImpl implements TransactionChainListener, Cluste
     }
 
     @Override
-    public void close() throws Exception {
-        if (this.singletonServiceRegistration != null) {
-            this.singletonServiceRegistration.close();
-            this.singletonServiceRegistration = null;
+    public synchronized void close() throws Exception {
+        this.scheduleTask.cancel(true);
+        if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
+            final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+            this.instanceIdentifiersCache.keySet().iterator()
+                    .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
+            wTx.submit().get();
         }
+        this.transactionChain.close();
     }
 
     @Override
@@ -170,20 +161,4 @@ public final class StateProviderImpl implements TransactionChainListener, Cluste
     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
         LOG.debug("Transaction chain {} successful.", chain);
     }
-
-    @Override
-    public synchronized ListenableFuture<Void> closeServiceInstance() {
-        this.scheduleTask.cancel(true);
-        final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
-        this.instanceIdentifiersCache.keySet().iterator()
-            .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
-        final ListenableFuture<Void> futureDelete = wTx.submit();
-        this.transactionChain.close();
-        return futureDelete;
-    }
-
-    @Override
-    public ServiceGroupIdentifier getIdentifier() {
-        return SERVICE_GROUP_IDENTIFIER;
-    }
 }
index 8d666e8d3fb4657b041ad23208dd83e3cf94496d..d7bc7b4ea094a0901e0e2879e3c20a4fbaa1e7b4 100644 (file)
@@ -6,13 +6,12 @@
                odl:type="pingpong"/>
     <reference id="bgpTableTypeRegistry" interface="org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer"/>
     <reference id="bgpStateProvider" interface="org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer"/>
-    <reference id="clusterSingletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
 
     <odl:clustered-app-config id="bgpStateConfig"
                               binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.state.config.rev161107.BgpStateConfig"/>
 
     <bean id="bgpOpenconfigState" class="org.opendaylight.protocol.bgp.state.StateProviderImpl"
-          destroy-method="close">
+          init-method="init" destroy-method="close">
         <argument ref="dataBroker"/>
         <argument>
             <bean factory-ref="bgpStateConfig" factory-method="getTimer"/>
@@ -20,6 +19,5 @@
         <argument ref="bgpTableTypeRegistry"/>
         <argument ref="bgpStateProvider"/>
         <argument value="global-bgp"/>
-        <argument ref="clusterSingletonServiceProvider"/>
     </bean>
-</blueprint>
\ No newline at end of file
+</blueprint>
index 352460d05e520344ec3026a01730fbc3db848522..c83d177faa706bbe85cd903025f7afdc32f7f5c9 100644 (file)
@@ -29,9 +29,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
 import org.opendaylight.protocol.bgp.rib.spi.State;
 import org.opendaylight.protocol.bgp.rib.spi.state.BGPAfiSafiState;
@@ -158,29 +155,14 @@ public class StateProviderImplTest extends AbstractConcurrentDataBrokerTest {
     private BGPGracelfulRestartState bgpGracelfulRestartState;
     @Mock
     private BGPAfiSafiState bgpAfiSafiState;
-    @Mock
-    private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
-    @Mock
-    private ClusterSingletonServiceRegistration singletonServiceRegistration;
 
     private final List<BGPPeerState> bgpPeerStates = new ArrayList<>();
     private final List<BGPRIBState> bgpRibStates = new ArrayList<>();
-    private ClusterSingletonService singletonService;
 
     @Before
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
 
-        Mockito.doAnswer(invocationOnMock -> {
-            this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
-            return this.singletonServiceRegistration;
-        }).when(this.clusterSingletonServiceProvider).registerClusterSingletonService(any(ClusterSingletonService.class));
-
-        Mockito.doAnswer(invocationOnMock -> {
-            this.singletonService.closeServiceInstance();
-            return null;
-        }).when(this.singletonServiceRegistration).close();
-
         doReturn(Optional.of(IPV4UNICAST.class))
             .when(this.tableTypeRegistry).getAfiSafiType(eq(TABLES_KEY));
 
@@ -253,10 +235,13 @@ public class StateProviderImplTest extends AbstractConcurrentDataBrokerTest {
     }
 
     @Test
-    public void testStateProvider() throws Exception {
+    public void testActiveStateProvider() throws Exception {
+        doReturn(true).when(this.bgpRibState).isActive();
+        doReturn(true).when(this.bgpPeerState).isActive();
+
         final StateProviderImpl stateProvider = new StateProviderImpl(getDataBroker(), 1, this.tableTypeRegistry,
-            this.stateCollector, "global-bgp", this.clusterSingletonServiceProvider);
-        this.singletonService.instantiateServiceInstance();
+            this.stateCollector, "global-bgp");
+        stateProvider.init();
 
         final Global globalExpected = buildGlobalExpected(0);
         this.bgpRibStates.add(this.bgpRibState);
@@ -329,6 +314,27 @@ public class StateProviderImplTest extends AbstractConcurrentDataBrokerTest {
         stateProvider.close();
     }
 
+    @Test
+    public void testInactiveStateProvider() throws Exception {
+        doReturn(false).when(this.bgpRibState).isActive();
+        doReturn(false).when(this.bgpPeerState).isActive();
+
+        final StateProviderImpl stateProvider = new StateProviderImpl(getDataBroker(), 1, this.tableTypeRegistry,
+            this.stateCollector, "global-bgp");
+        stateProvider.init();
+
+        this.bgpRibStates.add(this.bgpRibState);
+        checkNotPresentOperational(getDataBroker(), this.bgpInstanceIdentifier);
+
+        this.bgpPeerStates.add(this.bgpPeerState);
+        checkNotPresentOperational(getDataBroker(), this.bgpInstanceIdentifier);
+
+        this.bgpRibStates.clear();
+        checkNotPresentOperational(getDataBroker(), this.bgpInstanceIdentifier);
+
+        stateProvider.close();
+    }
+
     private static BgpNeighborStateAugmentation buildBgpNeighborStateAugmentation() {
         final BgpNeighborStateAugmentation augmentation = new BgpNeighborStateAugmentationBuilder()
                 .setMessages(new MessagesBuilder().setReceived(new ReceivedBuilder()
@@ -434,4 +440,4 @@ public class StateProviderImplTest extends AbstractConcurrentDataBrokerTest {
                     .build()).build())
             .build();
     }
-}
\ No newline at end of file
+}
index 61e056a00bf9eef5240551723f17a7062f39b284..9ed6cbd01ea86667b20cbe7dca7c811c76c2c168 100644 (file)
@@ -116,6 +116,7 @@ public class ApplicationPeer extends BGPPeerStateImpl implements org.opendayligh
 
     public synchronized void instantiateServiceInstance(final DOMDataTreeChangeService dataTreeChangeService,
             final DOMDataTreeIdentifier appPeerDOMId) {
+        setActive(true);
         this.chain = this.rib.createPeerChain(this);
         this.writerChain = this.rib.createPeerChain(this);
 
@@ -257,6 +258,7 @@ public class ApplicationPeer extends BGPPeerStateImpl implements org.opendayligh
     // FIXME ListenableFuture<?> should be used once closeServiceInstance uses wildcard too
     @Override
     public synchronized ListenableFuture<Void> close() {
+        setActive(false);
         if (this.registration != null) {
             this.registration.close();
             this.registration = null;
index 2574ec54318f5af244157f391385283ea0516a49..412edb0e8664da9eeda8650c1a051018cd01a2c4 100644 (file)
@@ -146,6 +146,7 @@ public class BGPPeer extends BGPPeerStateImpl implements BGPSessionListener, Pee
 
     public void instantiateServiceInstance() {
         this.ribWriter = AdjRibInWriter.create(this.rib.getYangRibId(), this.peerRole, this.simpleRoutingPolicy, this.chain);
+        setActive(true);
     }
 
     // FIXME ListenableFuture<?> should be used once closeServiceInstance uses wildcard too
@@ -153,6 +154,7 @@ public class BGPPeer extends BGPPeerStateImpl implements BGPSessionListener, Pee
     public synchronized ListenableFuture<Void> close() {
         final ListenableFuture<Void> future = releaseConnection();
         this.chain.close();
+        setActive(false);
         return future;
     }
 
index 7d36f3ac08c94e8faa0303a14458a0092d69e52c..776405614ea58a45a9496d7f6442f793e21edd32 100755 (executable)
@@ -312,6 +312,7 @@ public final class RIBImpl extends BGPRIBStateImpl implements ClusterSingletonSe
     @Override
     public synchronized void instantiateServiceInstance() {
         this.isServiceInstantiated = true;
+        setActive(true);
         this.domChain = this.domDataBroker.createTransactionChain(this);
         if (this.configurationWriter != null) {
             this.configurationWriter.apply();
@@ -357,6 +358,7 @@ public final class RIBImpl extends BGPRIBStateImpl implements ClusterSingletonSe
         }
         LOG.info("Close RIB Singleton Service {}, RIB {}", getIdentifier().getValue(), this.ribId.getValue());
         this.isServiceInstantiated = false;
+        setActive(false);
 
         this.txChainToLocRibWriter.values().forEach(LocRibWriter::close);
         this.txChainToLocRibWriter.clear();
index a005f92df818e79f9060e35bc61ede767e27b20e..a53ca99962df9e05de20424b641e7a9e90203838 100644 (file)
@@ -54,6 +54,8 @@ public abstract class BGPPeerStateImpl extends DefaultRibReference implements BG
     private final LongAdder notificationReceivedCounter = new LongAdder();
     private final LongAdder erroneousUpdate = new LongAdder();
     private final String groupId;
+    @GuardedBy("this")
+    private boolean active;
 
     @GuardedBy("this")
     private final Map<TablesKey, PrefixesSentCounters> prefixesSent = new HashMap<>();
@@ -256,4 +258,13 @@ public abstract class BGPPeerStateImpl extends DefaultRibReference implements BG
             this.updateReceivedCounter.increment();
         }
     }
+
+    @Override
+    public final synchronized boolean isActive() {
+        return this.active;
+    }
+
+    protected final synchronized void setActive(final boolean active) {
+        this.active = active;
+    }
 }
index e723b15f117811e49b7b584972b6bf44f56b42d4..07b76cf237a786430085aa88e3a0c895e1298897 100644 (file)
@@ -35,6 +35,8 @@ public class BGPRIBStateImpl extends DefaultRibReference implements BGPRIBState,
     private final Map<TablesKey, TotalPathsCounter> totalPaths = new HashMap<>();
     @GuardedBy("this")
     private final Map<TablesKey, TotalPrefixesCounter> totalPrefixes = new HashMap<>();
+    @GuardedBy("this")
+    private boolean active;
 
     protected BGPRIBStateImpl(final KeyedInstanceIdentifier<Rib, RibKey> instanceIdentifier,
         @Nonnull final BgpId routeId, @Nonnull final AsNumber localAs) {
@@ -95,6 +97,15 @@ public class BGPRIBStateImpl extends DefaultRibReference implements BGPRIBState,
         this.totalPrefixes.put(key, totalPrefixesCounter);
     }
 
+    @Override
+    public final synchronized boolean isActive() {
+        return this.active;
+    }
+
+    protected final synchronized void setActive(final boolean active) {
+        this.active = active;
+    }
+
     @Override
     public final BGPRIBState getRIBState() {
         return this;
index 05c03270979aa7990738a8fa3495c7f475afa63b..4f02eb38570d68473a7f26dbc33743bddb871da9 100644 (file)
@@ -24,6 +24,13 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.
  * - Per-AFI-SAFI operational state for BGP graceful-restart
  */
 public interface BGPPeerState extends RibReference {
+    /**
+     * Indicates whether this instance is being actively managed and updated
+     *
+     * @return active
+     */
+    boolean isActive();
+
     /**
      * PeerGroup Id
      *
index e5450254f091c7f50b82221346064a486e3330b8..0e5acf12b5376e86b05249022597ab5b04a32157 100644 (file)
@@ -22,6 +22,13 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.type
  * Total Paths / Total Prefixes counters, representing the paths / prefixes installed on Loc-rib
  */
 public interface BGPRIBState extends RibReference {
+    /**
+     * Indicates whether this instance is being actively managed and updated
+     *
+     * @return active
+     */
+    boolean isActive();
+
     /**
      * Prefixes count per tablesKey Type
      *