package org.opendaylight.protocol.bgp.state;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.HashMap;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
import org.opendaylight.protocol.bgp.rib.spi.state.BGPRIBState;
import org.slf4j.LoggerFactory;
@ThreadSafe
-public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
+public final class StateProviderImpl implements TransactionChainListener, ClusterSingletonService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
-
- private final BindingTransactionChain transactionChain;
+ private static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier
+ .create("bgp-state-provider-service-group");
private final BGPStateConsumer stateCollector;
private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
private final int timeout;
+ private final DataBroker dataBroker;
@GuardedBy("this")
private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
+ private ClusterSingletonServiceRegistration singletonServiceRegistration;
+ @GuardedBy("this")
+ private BindingTransactionChain transactionChain;
@GuardedBy("this")
private ScheduledFuture<?> scheduleTask;
public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
- @Nonnull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
- @Nonnull final BGPStateConsumer stateCollector, @Nonnull final String networkInstanceName) {
- this.transactionChain = Preconditions.checkNotNull(dataBroker).createTransactionChain(this);
+ @Nonnull BGPTableTypeRegistryConsumer bgpTableTypeRegistry, @Nonnull final BGPStateConsumer stateCollector,
+ @Nonnull final String networkInstanceName, @Nonnull final ClusterSingletonServiceProvider provider) {
+ this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.bgpTableTypeRegistry = Preconditions.checkNotNull(bgpTableTypeRegistry);
this.stateCollector = Preconditions.checkNotNull(stateCollector);
this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
.child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
this.timeout = timeout;
+ this.singletonServiceRegistration = Preconditions.checkNotNull(provider)
+ .registerClusterSingletonService(this);
}
- public synchronized void init() {
+ @Override
+ public synchronized void instantiateServiceInstance() {
+ this.transactionChain = this.dataBroker.createTransactionChain(this);
final TimerTask task = new TimerTask() {
@Override
public void run() {
}
};
- this.scheduleTask = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(task, 0, this.timeout, TimeUnit.SECONDS);
+ this.scheduleTask = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(task, 0, this.timeout,
+ TimeUnit.SECONDS);
}
private synchronized void updateBGPStats(final WriteTransaction wTx) {
final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
if (bgpIID == null) {
- final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier().getKey().getId().getValue());
+ final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
+ .getKey().getId().getValue());
final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
.child(Protocols.class).child(Protocol.class, protocolKey);
bgpIID = protocolIId.augmentation(Protocol1.class).child(Bgp.class);
}
@Override
- public synchronized void close() throws Exception {
- this.scheduleTask.cancel(true);
- final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
- this.instanceIdentifiersCache.keySet().forEach(ribId -> removeStoredOperationalState(ribId, wTx));
- wTx.submit();
- this.transactionChain.close();
+ public void close() throws Exception {
+ if (this.singletonServiceRegistration != null) {
+ this.singletonServiceRegistration.close();
+ this.singletonServiceRegistration = null;
+ }
}
@Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
+ final Throwable cause) {
LOG.error("Transaction chain failed {}.", transaction != null ? transaction.getIdentifier() : null, cause);
}
public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
LOG.debug("Transaction chain {} successful.", chain);
}
+
+ @Override
+ public synchronized ListenableFuture<Void> closeServiceInstance() {
+ this.scheduleTask.cancel(true);
+ final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
+ this.instanceIdentifiersCache.keySet().forEach(ribId -> removeStoredOperationalState(ribId, wTx));
+ final CheckedFuture<Void, TransactionCommitFailedException> futureDelete = wTx.submit();
+ this.transactionChain.close();
+ return futureDelete;
+ }
+
+ @Override
+ public ServiceGroupIdentifier getIdentifier() {
+ return SERVICE_GROUP_IDENTIFIER;
+ }
}
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
import org.opendaylight.protocol.bgp.rib.spi.State;
import org.opendaylight.protocol.bgp.rib.spi.state.BGPAfiSafiState;
private BGPGracelfulRestartState bgpGracelfulRestartState;
@Mock
private BGPAfiSafiState bgpAfiSafiState;
+ @Mock
+ private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+ @Mock
+ private ClusterSingletonServiceRegistration singletonServiceRegistration;
+
private final List<BGPPeerState> bgpPeerStates = new ArrayList<>();
private final List<BGPRIBState> bgpRibStates = new ArrayList<>();
+ private ClusterSingletonService singletonService;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
+
+ Mockito.doAnswer(invocationOnMock -> {
+ this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
+ return this.singletonServiceRegistration;
+ }).when(this.clusterSingletonServiceProvider).registerClusterSingletonService(any(ClusterSingletonService.class));
+
+ Mockito.doAnswer(invocationOnMock -> {
+ this.singletonService.closeServiceInstance();
+ return null;
+ }).when(this.singletonServiceRegistration).close();
+
doReturn(Optional.of(IPV4UNICAST.class))
.when(this.tableTypeRegistry).getAfiSafiType(eq(TABLES_KEY));
doReturn(this.as).when(this.bgpRibState).getAs();
doReturn(this.bgpId).when(this.bgpRibState).getRouteId();
- Mockito.doAnswer(new Answer<Long>() {
- @Override
- public Long answer(final InvocationOnMock invocation) throws Throwable {
- return StateProviderImplTest.this.totalPathsCounter.longValue();
- }
- }).when(this.bgpRibState).getTotalPathsCount();
- Mockito.doAnswer(new Answer<Long>() {
- @Override
- public Long answer(final InvocationOnMock invocation) throws Throwable {
- return StateProviderImplTest.this.totalPrefixesCounter.longValue();
- }
- }).when(this.bgpRibState).getTotalPrefixesCount();
- Mockito.doAnswer(new Answer<Long>() {
- @Override
- public Long answer(final InvocationOnMock invocation) throws Throwable {
- return StateProviderImplTest.this.totalPathsCounter.longValue();
- }
- }).when(this.bgpRibState).getPathCount(eq(TABLES_KEY));
- Mockito.doAnswer(new Answer<Long>() {
- @Override
- public Long answer(final InvocationOnMock invocation) throws Throwable {
- return StateProviderImplTest.this.totalPrefixesCounter.longValue();
- }
- }).when(this.bgpRibState).getPrefixesCount(eq(TABLES_KEY));
- Mockito.doAnswer(new Answer<Map<TablesKey, Long>>() {
- @Override
- public Map<TablesKey, Long> answer(final InvocationOnMock invocation) throws Throwable {
- return Collections.singletonMap(TABLES_KEY,
- StateProviderImplTest.this.totalPrefixesCounter.longValue());
- }
- }).when(this.bgpRibState).getPrefixesCount();
- Mockito.doAnswer(new Answer<Map<TablesKey, Long>>() {
- @Override
- public Map<TablesKey, Long> answer(final InvocationOnMock invocation) throws Throwable {
- return Collections.singletonMap(TABLES_KEY,
- StateProviderImplTest.this.totalPathsCounter.longValue());
- }
- }).when(this.bgpRibState).getPathsCount();
+ Mockito.doAnswer(invocation -> this.totalPathsCounter.longValue()).when(this.bgpRibState).getTotalPathsCount();
+ Mockito.doAnswer(invocation -> this.totalPrefixesCounter.longValue()).when(this.bgpRibState).getTotalPrefixesCount();
+ Mockito.doAnswer(invocation -> this.totalPathsCounter.longValue()).when(this.bgpRibState).getPathCount(eq(TABLES_KEY));
+ Mockito.doAnswer(invocation -> this.totalPrefixesCounter.longValue()).when(this.bgpRibState).getPrefixesCount(eq(TABLES_KEY));
+ Mockito.doAnswer(invocation -> Collections.singletonMap(TABLES_KEY,
+ this.totalPrefixesCounter.longValue())).when(this.bgpRibState).getPrefixesCount();
+ Mockito.doAnswer(invocation -> Collections.singletonMap(TABLES_KEY,
+ this.totalPathsCounter.longValue())).when(this.bgpRibState).getPathsCount();
// Mock Peer
doReturn("test-group").when(this.bgpPeerState).getGroupId();
doReturn(iid).when(this.bgpPeerState).getInstanceIdentifier();
- Mockito.doAnswer(new Answer<Long>() {
- @Override
- public Long answer(final InvocationOnMock invocation) throws Throwable {
- return StateProviderImplTest.this.totalPrefixesCounter.longValue();
- }
- }).when(this.bgpPeerState).getTotalPrefixes();
- Mockito.doAnswer(new Answer<Long>() {
- @Override
- public Long answer(final InvocationOnMock invocation) throws Throwable {
- return StateProviderImplTest.this.totalPathsCounter.longValue();
- }
- }).when(this.bgpPeerState).getTotalPathsCount();
+ Mockito.doAnswer(invocation -> this.totalPrefixesCounter.longValue()).when(this.bgpPeerState).getTotalPrefixes();
+ Mockito.doAnswer(invocation -> this.totalPathsCounter.longValue()).when(this.bgpPeerState).getTotalPathsCount();
doReturn(this.neighborAddress).when(this.bgpPeerState).getNeighborAddress();
doReturn(this.bgpSessionState).when(this.bgpPeerState).getBGPSessionState();
doReturn(this.bgpPeerMessagesState).when(this.bgpPeerState).getBGPPeerMessagesState();
doReturn(true).when(this.bgpAfiSafiState).isAfiSafiSupported(any());
doReturn(true).when(this.bgpAfiSafiState).isGracefulRestartAdvertized(any());
doReturn(true).when(this.bgpAfiSafiState).isGracefulRestartReceived(any());
-
-
}
@Test
public void testStateProvider() throws Exception {
final StateProviderImpl stateProvider = new StateProviderImpl(getDataBroker(), 1, this.tableTypeRegistry,
- this.stateCollector, "global-bgp");
- stateProvider.init();
+ this.stateCollector, "global-bgp", this.clusterSingletonServiceProvider);
+ this.singletonService.instantiateServiceInstance();
final Global globalExpected = buildGlobalExpected(0);
this.bgpRibStates.add(this.bgpRibState);
});
this.bgpPeerStates.add(this.bgpPeerState);
- final PeerGroup peerGroupExpected = buildGroupExpected(1L, 1L);
+ final PeerGroup peerGroupExpected = buildGroupExpected();
this.totalPathsCounter.increment();
this.totalPrefixesCounter.increment();
throw lastError;
}
- private static <R, T extends DataObject> R readData(final DataBroker dataBroker, final InstanceIdentifier<T> iid,
+ private static <R, T extends DataObject> void readData(final DataBroker dataBroker, final InstanceIdentifier<T> iid,
final Function<T, R> function)
throws ReadFailedException {
AssertionError lastError = null;
final com.google.common.base.Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
if (data.isPresent()) {
try {
- return function.apply(data.get());
+ function.apply(data.get());
+ return;
} catch (final AssertionError e) {
lastError = e;
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
.build()).build();
}
- private PeerGroup buildGroupExpected(final long totalPaths,
- final long totalPrefixes) {
+ private PeerGroup buildGroupExpected() {
return new PeerGroupBuilder().setPeerGroupName("test-group").setState(new org.opendaylight.yang.gen.v1.http
.openconfig.net.yang.bgp.rev151009.bgp.neighbor.group.StateBuilder()
.setSendCommunity(CommunityType.NONE)
.setRouteFlapDamping(false)
.addAugmentation(PeerGroupStateAugmentation.class,
- new PeerGroupStateAugmentationBuilder().setTotalPaths(totalPaths).setTotalPrefixes(totalPrefixes)
+ new PeerGroupStateAugmentationBuilder().setTotalPaths(1L).setTotalPrefixes(1L)
.build()).build())
.build();
}