5b5b08e2c5bb5274c8bfe48275135f690f08713f
[bgpcep.git] / bmp / bmp-impl / src / test / java / org / opendaylight / protocol / bmp / impl / app / BmpMonitorImplTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bmp.impl.app;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.opendaylight.protocol.bmp.parser.message.TestUtil.createRouteMonMsgWithEndOfRibMarker;
18 import static org.opendaylight.protocol.bmp.parser.message.TestUtil.createRouteMonitMsg;
19 import static org.opendaylight.protocol.util.CheckTestUtil.checkNotPresentOperational;
20 import static org.opendaylight.protocol.util.CheckTestUtil.readDataOperational;
21
22 import com.google.common.net.InetAddresses;
23 import io.netty.bootstrap.Bootstrap;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.EventLoopGroup;
29 import io.netty.channel.epoll.Epoll;
30 import io.netty.channel.epoll.EpollEventLoopGroup;
31 import io.netty.channel.epoll.EpollSocketChannel;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.socket.SocketChannel;
34 import io.netty.channel.socket.nio.NioSocketChannel;
35 import java.net.InetSocketAddress;
36 import java.util.Map;
37 import java.util.concurrent.ExecutionException;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.runner.RunWith;
42 import org.mockito.Mock;
43 import org.mockito.junit.MockitoJUnitRunner;
44 import org.opendaylight.mdsal.binding.dom.adapter.AdapterContext;
45 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
46 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTestCustomizer;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
49 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
52 import org.opendaylight.protocol.bgp.inet.RIBActivator;
53 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
54 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
55 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
56 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
57 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
58 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
59 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
60 import org.opendaylight.protocol.bmp.impl.BmpNettyGroups;
61 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
62 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
63 import org.opendaylight.protocol.bmp.parser.BmpActivator;
64 import org.opendaylight.protocol.bmp.parser.message.TestUtil;
65 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
66 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
67 import org.opendaylight.protocol.util.CheckUtil;
68 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddressNoZone;
69 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4AddressNoZone;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.Ipv4AddressFamily;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.UnicastSubsequentAddressFamily;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.AdjRibInType;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.PeerType;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.RouteMirroringMessage;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.StatsReportsMessage;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev200120.stat.Tlvs;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.BmpMonitor;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.MonitorId;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.RouterId;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.Status;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.bmp.monitor.Monitor;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.bmp.monitor.MonitorKey;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.Peer;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.PeerKey;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.Mirrors;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.PeerSession;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.PostPolicyRib;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.PrePolicyRib;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.peers.peer.Stats;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.routers.Router;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev200120.routers.RouterKey;
95 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
96 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
98 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
99 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
100 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
101 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
102
103 @RunWith(MockitoJUnitRunner.StrictStubs.class)
104 public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
105     // the local port and address where the monitor (ODL) will listen for incoming BMP request
106     private static final int MONITOR_LOCAL_PORT = 12345;
107     private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
108     private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
109     // the router (monitee) address where we are going to simulate a BMP request from
110     private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
111     private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
112     private static final Ipv4AddressNoZone PEER1 = new Ipv4AddressNoZone("20.20.20.20");
113     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
114     private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier
115         .create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
116     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
117     private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
118     private AdapterContext mappingService;
119     private final RIBActivator ribActivator = new RIBActivator();
120     private final BGPActivator bgpActivator = new BGPActivator();
121     private BmpActivator bmpActivator;
122     private BmpDispatcherImpl dispatcher;
123     private BmpMonitoringStation bmpApp;
124     private BmpMessageRegistry msgRegistry;
125     private final RIBExtensionProviderContext ribExtension = new SimpleRIBExtensionProviderContext();
126     private ClusterSingletonService singletonService;
127     private ClusterSingletonService singletonService2;
128     @Mock
129     private ClusterSingletonServiceRegistration singletonServiceRegistration;
130     @Mock
131     private ClusterSingletonServiceRegistration singletonServiceRegistration2;
132     @Mock
133     private ClusterSingletonServiceProvider clusterSSProv;
134     @Mock
135     private ClusterSingletonServiceProvider clusterSSProv2;
136
137     @Before
138     public void setUp() throws Exception {
139         super.setup();
140
141         doAnswer(invocationOnMock -> {
142             BmpMonitorImplTest.this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
143             singletonService.instantiateServiceInstance();
144             return BmpMonitorImplTest.this.singletonServiceRegistration;
145         }).when(clusterSSProv).registerClusterSingletonService(any(ClusterSingletonService.class));
146
147         doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService.closeServiceInstance())
148             .when(singletonServiceRegistration).close();
149
150         doAnswer(invocationOnMock -> {
151             singletonService2 = (ClusterSingletonService) invocationOnMock.getArguments()[0];
152             singletonService2.instantiateServiceInstance();
153             return BmpMonitorImplTest.this.singletonServiceRegistration2;
154         }).when(clusterSSProv2).registerClusterSingletonService(any(ClusterSingletonService.class));
155
156         doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService2.closeServiceInstance())
157             .when(singletonServiceRegistration2).close();
158
159         ribActivator.startRIBExtensionProvider(ribExtension, mappingService.currentSerializer());
160
161         final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext();
162         bgpActivator.start(context);
163         final SimpleBmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
164         bmpActivator = new BmpActivator(context);
165         bmpActivator.start(ctx);
166         msgRegistry = ctx.getBmpMessageRegistry();
167
168         dispatcher = new BmpDispatcherImpl(new BmpNettyGroups(), ctx, new DefaultBmpSessionFactory());
169
170         final InetSocketAddress inetAddress = new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS),
171             MONITOR_LOCAL_PORT);
172
173         final DOMDataTreeWriteTransaction wTx = getDomBroker().newWriteOnlyTransaction();
174         final ContainerNode parentNode = Builders.containerBuilder().withNodeIdentifier(
175                 new NodeIdentifier(BmpMonitor.QNAME))
176                 .addChild(ImmutableNodes.mapNodeBuilder(Monitor.QNAME).build()).build();
177         wTx.merge(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(BmpMonitor.QNAME), parentNode);
178         wTx.commit().get();
179
180         bmpApp = new BmpMonitoringStationImpl(getDomBroker(), dispatcher, ribExtension,
181             mappingService.currentSerializer(), clusterSSProv, MONITOR_ID, inetAddress, null);
182         readDataOperational(getDataBroker(), BMP_II, monitor -> {
183             assertEquals(1, monitor.nonnullMonitor().size());
184             final Monitor bmpMonitor = monitor.nonnullMonitor().values().iterator().next();
185             assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
186             assertEquals(0, bmpMonitor.nonnullRouter().size());
187             assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
188             assertEquals(0, bmpMonitor.nonnullRouter().size());
189             return monitor;
190         });
191     }
192
193     @Override
194     protected final AbstractDataBrokerTestCustomizer createDataBrokerTestCustomizer() {
195         final AbstractDataBrokerTestCustomizer customizer = super.createDataBrokerTestCustomizer();
196         mappingService = customizer.getAdapterContext();
197         return customizer;
198     }
199
200     @After
201     public void tearDown() throws Exception {
202         dispatcher.close();
203         bmpApp.close();
204
205         checkNotPresentOperational(getDataBroker(), BMP_II);
206     }
207
208     @Test(timeout = 20000)
209     public void testRouterMonitoring() throws Exception {
210         // first test if a single router monitoring is working
211         final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
212         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
213             assertEquals(1, monitor.nonnullRouter().size());
214             return monitor;
215         });
216
217         final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
218         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
219             assertEquals(2, monitor.nonnullRouter().size());
220             return monitor;
221         });
222
223         // initiate another BMP request from router 1, create a redundant connection
224         // we expect the connection to be closed
225         final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, msgRegistry);
226
227
228         // channel 1 should still be open, while channel3 should be closed
229         CheckUtil.checkEquals(() -> assertTrue(channel1.isOpen()));
230         CheckUtil.checkEquals(() -> assertFalse(channel3.isOpen()));
231
232         // now if we close the channel 1 and try it again, it should succeed
233         channel1.close().sync();
234
235         // channel 2 is still open
236         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
237             assertEquals(1, monitor.nonnullRouter().size());
238             return monitor;
239         });
240
241         final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
242         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
243             assertEquals(2, monitor.nonnullRouter().size());
244             return monitor;
245         });
246
247         // close all channel altogether
248         channel2.close().sync();
249         Thread.sleep(100);
250
251         // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
252         channel4.close().sync();
253
254         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
255             assertNull(monitor.getRouter());
256             return monitor;
257         });
258     }
259
260     private static void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
261         channelFuture.sync();
262     }
263
264     private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException,
265             ExecutionException {
266         final Channel channel = connectTestClient(remoteRouterIpAddr, msgRegistry);
267         final RouterId routerId = getRouterId(remoteRouterIpAddr);
268
269         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
270             // now find the current router instance
271             Router router = null;
272             for (final Router r : monitor.nonnullRouter().values()) {
273                 if (routerId.equals(r.getRouterId())) {
274                     router = r;
275                     break;
276                 }
277             }
278             assertNotNull(router);
279             assertEquals(Status.Down, router.getStatus());
280             assertNull(router.getPeer());
281             return router;
282         });
283
284         waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil
285                 .createInitMsg("description", "name", "some info")));
286
287         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
288             Router retRouter = null;
289             for (final Router r : monitor.nonnullRouter().values()) {
290                 if (routerId.equals(r.getRouterId())) {
291                     retRouter = r;
292                     break;
293                 }
294             }
295
296             assertNotNull(retRouter);
297             assertEquals("some info;", retRouter.getInfo());
298             assertEquals("name", retRouter.getName());
299             assertEquals("description", retRouter.getDescription());
300             assertEquals(routerId, retRouter.getRouterId());
301             assertNull(retRouter.getPeer());
302             assertEquals(Status.Up, retRouter.getStatus());
303             return retRouter;
304         });
305
306         waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
307         final KeyedInstanceIdentifier<Router, RouterKey> routerIId =
308                 MONITOR_IID.child(Router.class, new RouterKey(routerId));
309
310         readDataOperational(getDataBroker(), routerIId, router -> {
311             final Map<PeerKey, Peer> peers = router.getPeer();
312             assertNotNull(peers);
313             assertEquals(1, peers.size());
314             final Peer peer = peers.values().iterator().next();
315             assertEquals(PeerType.Global, peer.getType());
316             assertEquals(PEER_ID, peer.getPeerId());
317             assertEquals(PEER1, peer.getBgpId());
318             assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4AddressNoZone());
319             assertEquals(TestUtil.PEER_AS, peer.getAs());
320             assertNull(peer.getPeerDistinguisher());
321             assertNull(peer.getStats());
322
323             assertNotNull(peer.getPrePolicyRib());
324             assertEquals(1, peer.getPrePolicyRib().nonnullTables().size());
325             final Tables prePolicyTable = peer.getPrePolicyRib().nonnullTables().values().iterator().next();
326             assertEquals(Ipv4AddressFamily.VALUE, prePolicyTable.getAfi());
327             assertEquals(UnicastSubsequentAddressFamily.VALUE, prePolicyTable.getSafi());
328             assertFalse(prePolicyTable.getAttributes().getUptodate());
329
330             assertNotNull(peer.getPostPolicyRib());
331             assertEquals(1, peer.getPostPolicyRib().nonnullTables().size());
332             final Tables postPolicyTable = peer.getPrePolicyRib().nonnullTables().values().iterator().next();
333             assertEquals(Ipv4AddressFamily.VALUE, postPolicyTable.getAfi());
334             assertEquals(UnicastSubsequentAddressFamily.VALUE, postPolicyTable.getSafi());
335             assertFalse(postPolicyTable.getAttributes().getUptodate());
336
337             assertNotNull(peer.getPeerSession());
338             final PeerSession peerSession = peer.getPeerSession();
339             assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4AddressNoZone());
340             assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
341             assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
342             assertEquals(Status.Up, peerSession.getStatus());
343             assertNotNull(peerSession.getReceivedOpen());
344             assertNotNull(peerSession.getSentOpen());
345             return router;
346         });
347
348
349         final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
350         waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
351         final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
352
353         readDataOperational(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
354             assertNotNull(peerStats.getTimestampSec());
355             final Tlvs tlvs = statsMsg.getTlvs();
356             assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
357             assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(),
358                     peerStats.getDuplicatePrefixAdvertisements());
359             assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
360             assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
361             assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
362             assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(),
363                     peerStats.getInvalidatedClusterListLoop());
364             assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
365             assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
366             assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
367             assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(),
368                     peerStats.getPerAfiSafiAdjRibInRoutes().nonnullAfiSafi().values().iterator().next().getCount()
369                     .toString());
370             assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(),
371                     peerStats.getPerAfiSafiLocRibRoutes().nonnullAfiSafi().values().iterator().next().getCount()
372                     .toString());
373             return peerStats;
374         });
375
376         // route mirror message test
377         final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
378         waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
379
380         readDataOperational(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
381             assertNotNull(routeMirrors.getTimestampSec());
382             return routeMirrors;
383         });
384
385         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonitMsg(false, PEER1,
386                 AdjRibInType.PrePolicy)));
387         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonMsgWithEndOfRibMarker(PEER1,
388                 AdjRibInType.PrePolicy)));
389
390         readDataOperational(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
391             assertFalse(prePolicyRib.nonnullTables().isEmpty());
392             final Tables tables = prePolicyRib.nonnullTables().values().iterator().next();
393             assertTrue(tables.getAttributes().getUptodate());
394             assertEquals(3, ((Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().nonnullIpv4Route().size());
395             return tables;
396         });
397
398         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonitMsg(false, PEER1,
399                 AdjRibInType.PostPolicy)));
400         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonMsgWithEndOfRibMarker(PEER1,
401                 AdjRibInType.PostPolicy)));
402
403         readDataOperational(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
404             assertFalse(postPolicyRib.nonnullTables().isEmpty());
405             final Tables tables = postPolicyRib.nonnullTables().values().iterator().next();
406             assertTrue(tables.getAttributes().getUptodate());
407             assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet
408                     .rev180329.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase)
409                     tables.getRoutes()).getIpv4Routes().nonnullIpv4Route().size());
410             return tables;
411         });
412
413         waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
414
415         readDataOperational(getDataBroker(), routerIId, router -> {
416             assertNull(router.getPeer());
417             return router;
418         });
419
420         return channel;
421     }
422
423     @Test
424     public void deploySecondInstance() throws Exception {
425         final BmpMonitoringStation monitoringStation2 = new BmpMonitoringStationImpl(getDomBroker(), dispatcher,
426             ribExtension, mappingService.currentSerializer(), clusterSSProv2, new MonitorId("monitor2"),
427             new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), null);
428
429         readDataOperational(getDataBroker(), BMP_II, monitor -> {
430             assertEquals(2, monitor.nonnullMonitor().size());
431             return monitor;
432         });
433
434         monitoringStation2.close();
435     }
436
437     private static Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry)
438             throws InterruptedException {
439         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
440         final Bootstrap b = new Bootstrap();
441         final EventLoopGroup workerGroup;
442         if (Epoll.isAvailable()) {
443             b.channel(EpollSocketChannel.class);
444             workerGroup = new EpollEventLoopGroup();
445         } else {
446             b.channel(NioSocketChannel.class);
447             workerGroup = new NioEventLoopGroup();
448         }
449         b.group(workerGroup);
450         b.option(ChannelOption.SO_KEEPALIVE, true);
451         b.handler(new ChannelInitializer<SocketChannel>() {
452             @Override
453             protected void initChannel(final SocketChannel ch) {
454                 ch.pipeline().addLast(hf.getDecoders());
455                 ch.pipeline().addLast(hf.getEncoders());
456             }
457         });
458         b.localAddress(new InetSocketAddress(routerIp, 0));
459         b.option(ChannelOption.SO_REUSEADDR, true);
460         final ChannelFuture future = b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
461         future.sync();
462         return future.channel();
463     }
464
465     private static RouterId getRouterId(final String routerIp) {
466         return new RouterId(new IpAddressNoZone(new Ipv4AddressNoZone(routerIp)));
467     }
468 }