- StateProviderImpl uses clustering singleton service to run only 1 instance in cluster.
Problem is that if BGP speaker needs to be kept separate across the cluster nodes
e.g. for scalability issues (creating separate shard for bgp rib and openconfig module and
disabling replication for them), since the singleton service name is same across
the nodes, only 1 instance of the service runs and so operational data is available
only on that node.
- This PR changes this to run operational data collection/update service on all nodes,
but perform the activity only for rib and peer that are active on that node.
- Updated unit-tests.
Change-Id: I7284b249b8ec68b4c6905a92bbb221816d196cae
Signed-off-by: Ajay Lele <ajayslele@gmail.com>
import static java.util.Objects.requireNonNull;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.HashMap;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
-import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
import org.opendaylight.protocol.bgp.rib.spi.state.BGPRIBState;
import org.slf4j.LoggerFactory;
@ThreadSafe
-public final class StateProviderImpl implements TransactionChainListener, ClusterSingletonService, AutoCloseable {
+public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
- private static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier
- .create("bgp-state-provider-service-group");
private final BGPStateConsumer stateCollector;
private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
private final DataBroker dataBroker;
@GuardedBy("this")
private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
- private ClusterSingletonServiceRegistration singletonServiceRegistration;
@GuardedBy("this")
private BindingTransactionChain transactionChain;
@GuardedBy("this")
private ScheduledFuture<?> scheduleTask;
public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
- @Nonnull BGPTableTypeRegistryConsumer bgpTableTypeRegistry, @Nonnull final BGPStateConsumer stateCollector,
- @Nonnull final String networkInstanceName, @Nonnull final ClusterSingletonServiceProvider provider) {
+ @Nonnull final BGPTableTypeRegistryConsumer bgpTableTypeRegistry, @Nonnull final BGPStateConsumer stateCollector,
+ @Nonnull final String networkInstanceName) {
this.dataBroker = requireNonNull(dataBroker);
this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
this.stateCollector = requireNonNull(stateCollector);
this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
.child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
this.timeout = timeout;
- this.singletonServiceRegistration = requireNonNull(provider)
- .registerClusterSingletonService(this);
}
- @Override
- public synchronized void instantiateServiceInstance() {
+ public synchronized void init() {
this.transactionChain = this.dataBroker.createTransactionChain(this);
final TimerTask task = new TimerTask() {
@Override
private synchronized void updateBGPStats(final WriteTransaction wTx) {
final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
- this.stateCollector.getRibStats()
- .forEach(bgpStateConsumer -> {
- final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
- final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
- .filter(peerState -> ribId.equals(peerState.getInstanceIdentifier())).collect(Collectors.toList());
- storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wTx);
- oldStats.remove(ribId.getKey().getId().getValue());
- });
+ this.stateCollector.getRibStats().stream().filter(BGPRIBState::isActive).forEach(bgpStateConsumer -> {
+ final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
+ final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
+ .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
+ .collect(Collectors.toList());
+ storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wTx);
+ oldStats.remove(ribId.getKey().getId().getValue());
+ });
oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wTx));
}
}
@Override
- public void close() throws Exception {
- if (this.singletonServiceRegistration != null) {
- this.singletonServiceRegistration.close();
- this.singletonServiceRegistration = null;
+ public synchronized void close() throws Exception {
+ this.scheduleTask.cancel(true);
+ if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
+ final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+ this.instanceIdentifiersCache.keySet().iterator()
+ .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
+ wTx.submit().get();
}
+ this.transactionChain.close();
}
@Override
public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
LOG.debug("Transaction chain {} successful.", chain);
}
-
- @Override
- public synchronized ListenableFuture<Void> closeServiceInstance() {
- this.scheduleTask.cancel(true);
- final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
- this.instanceIdentifiersCache.keySet().iterator()
- .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
- final ListenableFuture<Void> futureDelete = wTx.submit();
- this.transactionChain.close();
- return futureDelete;
- }
-
- @Override
- public ServiceGroupIdentifier getIdentifier() {
- return SERVICE_GROUP_IDENTIFIER;
- }
}
odl:type="pingpong"/>
<reference id="bgpTableTypeRegistry" interface="org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer"/>
<reference id="bgpStateProvider" interface="org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer"/>
- <reference id="clusterSingletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
<odl:clustered-app-config id="bgpStateConfig"
binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.state.config.rev161107.BgpStateConfig"/>
<bean id="bgpOpenconfigState" class="org.opendaylight.protocol.bgp.state.StateProviderImpl"
- destroy-method="close">
+ init-method="init" destroy-method="close">
<argument ref="dataBroker"/>
<argument>
<bean factory-ref="bgpStateConfig" factory-method="getTimer"/>
<argument ref="bgpTableTypeRegistry"/>
<argument ref="bgpStateProvider"/>
<argument value="global-bgp"/>
- <argument ref="clusterSingletonServiceProvider"/>
</bean>
-</blueprint>
\ No newline at end of file
+</blueprint>
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
import org.opendaylight.protocol.bgp.rib.spi.State;
import org.opendaylight.protocol.bgp.rib.spi.state.BGPAfiSafiState;
private BGPGracelfulRestartState bgpGracelfulRestartState;
@Mock
private BGPAfiSafiState bgpAfiSafiState;
- @Mock
- private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
- @Mock
- private ClusterSingletonServiceRegistration singletonServiceRegistration;
private final List<BGPPeerState> bgpPeerStates = new ArrayList<>();
private final List<BGPRIBState> bgpRibStates = new ArrayList<>();
- private ClusterSingletonService singletonService;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- Mockito.doAnswer(invocationOnMock -> {
- this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
- return this.singletonServiceRegistration;
- }).when(this.clusterSingletonServiceProvider).registerClusterSingletonService(any(ClusterSingletonService.class));
-
- Mockito.doAnswer(invocationOnMock -> {
- this.singletonService.closeServiceInstance();
- return null;
- }).when(this.singletonServiceRegistration).close();
-
doReturn(Optional.of(IPV4UNICAST.class))
.when(this.tableTypeRegistry).getAfiSafiType(eq(TABLES_KEY));
}
@Test
- public void testStateProvider() throws Exception {
+ public void testActiveStateProvider() throws Exception {
+ doReturn(true).when(this.bgpRibState).isActive();
+ doReturn(true).when(this.bgpPeerState).isActive();
+
final StateProviderImpl stateProvider = new StateProviderImpl(getDataBroker(), 1, this.tableTypeRegistry,
- this.stateCollector, "global-bgp", this.clusterSingletonServiceProvider);
- this.singletonService.instantiateServiceInstance();
+ this.stateCollector, "global-bgp");
+ stateProvider.init();
final Global globalExpected = buildGlobalExpected(0);
this.bgpRibStates.add(this.bgpRibState);
stateProvider.close();
}
+ @Test
+ public void testInactiveStateProvider() throws Exception {
+ doReturn(false).when(this.bgpRibState).isActive();
+ doReturn(false).when(this.bgpPeerState).isActive();
+
+ final StateProviderImpl stateProvider = new StateProviderImpl(getDataBroker(), 1, this.tableTypeRegistry,
+ this.stateCollector, "global-bgp");
+ stateProvider.init();
+
+ this.bgpRibStates.add(this.bgpRibState);
+ checkNotPresentOperational(getDataBroker(), this.bgpInstanceIdentifier);
+
+ this.bgpPeerStates.add(this.bgpPeerState);
+ checkNotPresentOperational(getDataBroker(), this.bgpInstanceIdentifier);
+
+ this.bgpRibStates.clear();
+ checkNotPresentOperational(getDataBroker(), this.bgpInstanceIdentifier);
+
+ stateProvider.close();
+ }
+
private static BgpNeighborStateAugmentation buildBgpNeighborStateAugmentation() {
final BgpNeighborStateAugmentation augmentation = new BgpNeighborStateAugmentationBuilder()
.setMessages(new MessagesBuilder().setReceived(new ReceivedBuilder()
.build()).build())
.build();
}
-}
\ No newline at end of file
+}
public synchronized void instantiateServiceInstance(final DOMDataTreeChangeService dataTreeChangeService,
final DOMDataTreeIdentifier appPeerDOMId) {
+ setActive(true);
this.chain = this.rib.createPeerChain(this);
this.writerChain = this.rib.createPeerChain(this);
// FIXME ListenableFuture<?> should be used once closeServiceInstance uses wildcard too
@Override
public synchronized ListenableFuture<Void> close() {
+ setActive(false);
if (this.registration != null) {
this.registration.close();
this.registration = null;
public void instantiateServiceInstance() {
this.ribWriter = AdjRibInWriter.create(this.rib.getYangRibId(), this.peerRole, this.simpleRoutingPolicy, this.chain);
+ setActive(true);
}
// FIXME ListenableFuture<?> should be used once closeServiceInstance uses wildcard too
public synchronized ListenableFuture<Void> close() {
final ListenableFuture<Void> future = releaseConnection();
this.chain.close();
+ setActive(false);
return future;
}
@Override
public synchronized void instantiateServiceInstance() {
this.isServiceInstantiated = true;
+ setActive(true);
this.domChain = this.domDataBroker.createTransactionChain(this);
if (this.configurationWriter != null) {
this.configurationWriter.apply();
}
LOG.info("Close RIB Singleton Service {}, RIB {}", getIdentifier().getValue(), this.ribId.getValue());
this.isServiceInstantiated = false;
+ setActive(false);
this.txChainToLocRibWriter.values().forEach(LocRibWriter::close);
this.txChainToLocRibWriter.clear();
private final LongAdder notificationReceivedCounter = new LongAdder();
private final LongAdder erroneousUpdate = new LongAdder();
private final String groupId;
+ @GuardedBy("this")
+ private boolean active;
@GuardedBy("this")
private final Map<TablesKey, PrefixesSentCounters> prefixesSent = new HashMap<>();
this.updateReceivedCounter.increment();
}
}
+
+ @Override
+ public final synchronized boolean isActive() {
+ return this.active;
+ }
+
+ protected final synchronized void setActive(final boolean active) {
+ this.active = active;
+ }
}
private final Map<TablesKey, TotalPathsCounter> totalPaths = new HashMap<>();
@GuardedBy("this")
private final Map<TablesKey, TotalPrefixesCounter> totalPrefixes = new HashMap<>();
+ @GuardedBy("this")
+ private boolean active;
protected BGPRIBStateImpl(final KeyedInstanceIdentifier<Rib, RibKey> instanceIdentifier,
@Nonnull final BgpId routeId, @Nonnull final AsNumber localAs) {
this.totalPrefixes.put(key, totalPrefixesCounter);
}
+ @Override
+ public final synchronized boolean isActive() {
+ return this.active;
+ }
+
+ protected final synchronized void setActive(final boolean active) {
+ this.active = active;
+ }
+
@Override
public final BGPRIBState getRIBState() {
return this;
* - Per-AFI-SAFI operational state for BGP graceful-restart
*/
public interface BGPPeerState extends RibReference {
+ /**
+ * Indicates whether this instance is being actively managed and updated
+ *
+ * @return active
+ */
+ boolean isActive();
+
/**
* PeerGroup Id
*
* Total Paths / Total Prefixes counters, representing the paths / prefixes installed on Loc-rib
*/
public interface BGPRIBState extends RibReference {
+ /**
+ * Indicates whether this instance is being actively managed and updated
+ *
+ * @return active
+ */
+ boolean isActive();
+
/**
* Prefixes count per tablesKey Type
*