2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bmp.impl.app;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.opendaylight.protocol.bmp.parser.message.TestUtil.createRouteMonMsgWithEndOfRibMarker;
18 import static org.opendaylight.protocol.bmp.parser.message.TestUtil.createRouteMonitMsg;
19 import static org.opendaylight.protocol.util.CheckTestUtil.checkNotPresentOperational;
20 import static org.opendaylight.protocol.util.CheckTestUtil.readDataOperational;
22 import com.google.common.net.InetAddresses;
23 import io.netty.bootstrap.Bootstrap;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.EventLoopGroup;
29 import io.netty.channel.epoll.Epoll;
30 import io.netty.channel.epoll.EpollEventLoopGroup;
31 import io.netty.channel.epoll.EpollSocketChannel;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.socket.SocketChannel;
34 import io.netty.channel.socket.nio.NioSocketChannel;
35 import java.net.InetSocketAddress;
37 import java.util.concurrent.ExecutionException;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.runner.RunWith;
42 import org.mockito.Mock;
43 import org.mockito.junit.MockitoJUnitRunner;
44 import org.opendaylight.mdsal.binding.dom.adapter.AdapterContext;
45 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
46 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTestCustomizer;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
49 import org.opendaylight.mdsal.singleton.api.ClusterSingletonService;
50 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.protocol.bgp.inet.RIBActivator;
52 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
53 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
54 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
55 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
56 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
57 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
58 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
59 import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
60 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
61 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
62 import org.opendaylight.protocol.bmp.parser.BmpActivator;
63 import org.opendaylight.protocol.bmp.parser.message.TestUtil;
64 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
65 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
66 import org.opendaylight.protocol.util.CheckUtil;
67 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressNoZone;
68 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4AddressNoZone;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerId;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.Ipv4AddressFamily;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.UnicastSubsequentAddressFamily;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.AdjRibInType;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.PeerType;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.RouteMirroringMessage;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.StatsReportsMessage;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.stat.Tlvs;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.BmpMonitor;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.MonitorId;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.RouterId;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.Status;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.bmp.monitor.Monitor;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.bmp.monitor.MonitorKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.Peer;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.PeerKey;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.Mirrors;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.PeerSession;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.PostPolicyRib;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.PrePolicyRib;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.Stats;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.routers.Router;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.routers.RouterKey;
94 import org.opendaylight.yangtools.concepts.Registration;
95 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
96 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
98 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
99 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
100 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
102 @RunWith(MockitoJUnitRunner.StrictStubs.class)
103 public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
104 // the local port and address where the monitor (ODL) will listen for incoming BMP request
105 private static final int MONITOR_LOCAL_PORT = 12345;
106 private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
107 private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
108 // the router (monitee) address where we are going to simulate a BMP request from
109 private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
110 private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
111 private static final Ipv4AddressNoZone PEER1 = new Ipv4AddressNoZone("20.20.20.20");
112 private static final MonitorId MONITOR_ID = new MonitorId("monitor");
113 private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID =
114 InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
115 private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
116 private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
117 private AdapterContext mappingService;
118 private final RIBActivator ribActivator = new RIBActivator();
119 private final BGPActivator bgpActivator = new BGPActivator();
120 private BmpActivator bmpActivator;
121 private BmpDispatcherImpl dispatcher;
122 private BmpMonitoringStation bmpApp;
123 private BmpMessageRegistry msgRegistry;
124 private final RIBExtensionProviderContext ribExtension = new SimpleRIBExtensionProviderContext();
125 private ClusterSingletonService singletonService;
126 private ClusterSingletonService singletonService2;
128 private Registration singletonServiceRegistration;
130 private Registration singletonServiceRegistration2;
132 private ClusterSingletonServiceProvider clusterSSProv;
134 private ClusterSingletonServiceProvider clusterSSProv2;
137 public void setUp() throws Exception {
140 doAnswer(invocationOnMock -> {
141 BmpMonitorImplTest.this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
142 singletonService.instantiateServiceInstance();
143 return BmpMonitorImplTest.this.singletonServiceRegistration;
144 }).when(clusterSSProv).registerClusterSingletonService(any(ClusterSingletonService.class));
146 doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService.closeServiceInstance())
147 .when(singletonServiceRegistration).close();
149 doAnswer(invocationOnMock -> {
150 singletonService2 = (ClusterSingletonService) invocationOnMock.getArguments()[0];
151 singletonService2.instantiateServiceInstance();
152 return BmpMonitorImplTest.this.singletonServiceRegistration2;
153 }).when(clusterSSProv2).registerClusterSingletonService(any(ClusterSingletonService.class));
155 doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService2.closeServiceInstance())
156 .when(singletonServiceRegistration2).close();
158 ribActivator.startRIBExtensionProvider(ribExtension, mappingService.currentSerializer());
160 final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext();
161 bgpActivator.start(context);
162 final SimpleBmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
163 bmpActivator = new BmpActivator(context);
164 bmpActivator.start(ctx);
165 msgRegistry = ctx.getBmpMessageRegistry();
167 dispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx, new DefaultBmpSessionFactory());
169 final InetSocketAddress inetAddress = new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS),
172 final DOMDataTreeWriteTransaction wTx = getDomBroker().newWriteOnlyTransaction();
173 final ContainerNode parentNode = ImmutableNodes.newContainerBuilder()
174 .withNodeIdentifier(new NodeIdentifier(BmpMonitor.QNAME))
175 .addChild(ImmutableNodes.newSystemMapBuilder()
176 .withNodeIdentifier(new NodeIdentifier(Monitor.QNAME))
179 wTx.merge(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(BmpMonitor.QNAME), parentNode);
182 bmpApp = new BmpMonitoringStationImpl(getDomBroker(), dispatcher, ribExtension,
183 mappingService.currentSerializer(), clusterSSProv, MONITOR_ID, inetAddress, null);
184 readDataOperational(getDataBroker(), BMP_II, monitor -> {
185 assertEquals(1, monitor.nonnullMonitor().size());
186 final Monitor bmpMonitor = monitor.nonnullMonitor().values().iterator().next();
187 assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
188 assertEquals(0, bmpMonitor.nonnullRouter().size());
189 assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
190 assertEquals(0, bmpMonitor.nonnullRouter().size());
196 protected final AbstractDataBrokerTestCustomizer createDataBrokerTestCustomizer() {
197 final AbstractDataBrokerTestCustomizer customizer = super.createDataBrokerTestCustomizer();
198 mappingService = customizer.getAdapterContext();
203 public void tearDown() throws Exception {
207 checkNotPresentOperational(getDataBroker(), BMP_II);
210 @Test(timeout = 20000)
211 public void testRouterMonitoring() throws Exception {
212 // first test if a single router monitoring is working
213 final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
214 readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
215 assertEquals(1, monitor.nonnullRouter().size());
219 final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
220 readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
221 assertEquals(2, monitor.nonnullRouter().size());
225 // initiate another BMP request from router 1, create a redundant connection
226 // we expect the connection to be closed
227 final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, msgRegistry);
230 // channel 1 should still be open, while channel3 should be closed
231 CheckUtil.checkEquals(() -> assertTrue(channel1.isOpen()));
232 CheckUtil.checkEquals(() -> assertFalse(channel3.isOpen()));
234 // now if we close the channel 1 and try it again, it should succeed
235 channel1.close().sync();
237 // channel 2 is still open
238 readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
239 assertEquals(1, monitor.nonnullRouter().size());
243 final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
244 readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
245 assertEquals(2, monitor.nonnullRouter().size());
249 // close all channel altogether
250 channel2.close().sync();
253 // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
254 channel4.close().sync();
256 readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
257 assertNull(monitor.getRouter());
262 private static void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
263 channelFuture.sync();
266 private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException,
268 final Channel channel = connectTestClient(remoteRouterIpAddr, msgRegistry);
269 final RouterId routerId = getRouterId(remoteRouterIpAddr);
271 readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
272 // now find the current router instance
273 Router router = null;
274 for (final Router r : monitor.nonnullRouter().values()) {
275 if (routerId.equals(r.getRouterId())) {
280 assertNotNull(router);
281 assertEquals(Status.Down, router.getStatus());
282 assertNull(router.getPeer());
286 waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil
287 .createInitMsg("description", "name", "some info")));
289 readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
290 Router retRouter = null;
291 for (final Router r : monitor.nonnullRouter().values()) {
292 if (routerId.equals(r.getRouterId())) {
298 assertNotNull(retRouter);
299 assertEquals("some info;", retRouter.getInfo());
300 assertEquals("name", retRouter.getName());
301 assertEquals("description", retRouter.getDescription());
302 assertEquals(routerId, retRouter.getRouterId());
303 assertNull(retRouter.getPeer());
304 assertEquals(Status.Up, retRouter.getStatus());
308 waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
309 final KeyedInstanceIdentifier<Router, RouterKey> routerIId =
310 MONITOR_IID.child(Router.class, new RouterKey(routerId));
312 readDataOperational(getDataBroker(), routerIId, router -> {
313 final Map<PeerKey, Peer> peers = router.getPeer();
314 assertNotNull(peers);
315 assertEquals(1, peers.size());
316 final Peer peer = peers.values().iterator().next();
317 assertEquals(PeerType.Global, peer.getType());
318 assertEquals(PEER_ID, peer.getPeerId());
319 assertEquals(PEER1, peer.getBgpId());
320 assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4AddressNoZone());
321 assertEquals(TestUtil.PEER_AS, peer.getAs());
322 assertNull(peer.getPeerDistinguisher());
323 assertNull(peer.getStats());
325 assertNotNull(peer.getPrePolicyRib());
326 assertEquals(1, peer.getPrePolicyRib().nonnullTables().size());
327 final Tables prePolicyTable = peer.getPrePolicyRib().nonnullTables().values().iterator().next();
328 assertEquals(Ipv4AddressFamily.VALUE, prePolicyTable.getAfi());
329 assertEquals(UnicastSubsequentAddressFamily.VALUE, prePolicyTable.getSafi());
330 assertFalse(prePolicyTable.getAttributes().getUptodate());
332 assertNotNull(peer.getPostPolicyRib());
333 assertEquals(1, peer.getPostPolicyRib().nonnullTables().size());
334 final Tables postPolicyTable = peer.getPrePolicyRib().nonnullTables().values().iterator().next();
335 assertEquals(Ipv4AddressFamily.VALUE, postPolicyTable.getAfi());
336 assertEquals(UnicastSubsequentAddressFamily.VALUE, postPolicyTable.getSafi());
337 assertFalse(postPolicyTable.getAttributes().getUptodate());
339 assertNotNull(peer.getPeerSession());
340 final PeerSession peerSession = peer.getPeerSession();
341 assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4AddressNoZone());
342 assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
343 assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
344 assertEquals(Status.Up, peerSession.getStatus());
345 assertNotNull(peerSession.getReceivedOpen());
346 assertNotNull(peerSession.getSentOpen());
351 final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
352 waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
353 final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
355 readDataOperational(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
356 assertNotNull(peerStats.getTimestampSec());
357 final Tlvs tlvs = statsMsg.getTlvs();
358 assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
359 assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(),
360 peerStats.getDuplicatePrefixAdvertisements());
361 assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
362 assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
363 assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
364 assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(),
365 peerStats.getInvalidatedClusterListLoop());
366 assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
367 assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
368 assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
369 assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(),
370 peerStats.getPerAfiSafiAdjRibInRoutes().nonnullAfiSafi().values().iterator().next().getCount()
372 assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(),
373 peerStats.getPerAfiSafiLocRibRoutes().nonnullAfiSafi().values().iterator().next().getCount()
378 // route mirror message test
379 final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
380 waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
382 readDataOperational(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
383 assertNotNull(routeMirrors.getTimestampSec());
387 waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonitMsg(false, PEER1,
388 AdjRibInType.PrePolicy)));
389 waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonMsgWithEndOfRibMarker(PEER1,
390 AdjRibInType.PrePolicy)));
392 readDataOperational(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
393 assertFalse(prePolicyRib.nonnullTables().isEmpty());
394 final Tables tables = prePolicyRib.nonnullTables().values().iterator().next();
395 assertTrue(tables.getAttributes().getUptodate());
396 assertEquals(3, ((Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().nonnullIpv4Route().size());
400 waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonitMsg(false, PEER1,
401 AdjRibInType.PostPolicy)));
402 waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonMsgWithEndOfRibMarker(PEER1,
403 AdjRibInType.PostPolicy)));
405 readDataOperational(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
406 assertFalse(postPolicyRib.nonnullTables().isEmpty());
407 final Tables tables = postPolicyRib.nonnullTables().values().iterator().next();
408 assertTrue(tables.getAttributes().getUptodate());
409 assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet
410 .rev180329.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase)
411 tables.getRoutes()).getIpv4Routes().nonnullIpv4Route().size());
415 waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
417 readDataOperational(getDataBroker(), routerIId, router -> {
418 assertNull(router.getPeer());
426 public void deploySecondInstance() throws Exception {
427 try (var monitoringStation2 = new BmpMonitoringStationImpl(getDomBroker(), dispatcher,
428 ribExtension, mappingService.currentSerializer(), clusterSSProv2, new MonitorId("monitor2"),
429 new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), null)) {
431 readDataOperational(getDataBroker(), BMP_II, monitor -> {
432 assertEquals(2, monitor.nonnullMonitor().size());
438 private static Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry)
439 throws InterruptedException {
440 final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
441 final Bootstrap b = new Bootstrap();
442 final EventLoopGroup workerGroup;
443 if (Epoll.isAvailable()) {
444 b.channel(EpollSocketChannel.class);
445 workerGroup = new EpollEventLoopGroup();
447 b.channel(NioSocketChannel.class);
448 workerGroup = new NioEventLoopGroup();
450 b.group(workerGroup);
451 b.option(ChannelOption.SO_KEEPALIVE, true);
452 b.handler(new ChannelInitializer<SocketChannel>() {
454 protected void initChannel(final SocketChannel ch) {
455 ch.pipeline().addLast(hf.getDecoders());
456 ch.pipeline().addLast(hf.getEncoders());
459 b.localAddress(new InetSocketAddress(routerIp, 0));
460 b.option(ChannelOption.SO_REUSEADDR, true);
461 final ChannelFuture future = b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
463 return future.channel();
466 private static RouterId getRouterId(final String routerIp) {
467 return new RouterId(new IpAddressNoZone(new Ipv4AddressNoZone(routerIp)));