Migrate to MD-SAL APIs
[bgpcep.git] / bmp / bmp-impl / src / test / java / org / opendaylight / protocol / bmp / impl / app / BmpMonitorImplTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bmp.impl.app;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.opendaylight.protocol.bmp.parser.message.TestUtil.createRouteMonMsgWithEndOfRibMarker;
18 import static org.opendaylight.protocol.bmp.parser.message.TestUtil.createRouteMonitMsg;
19 import static org.opendaylight.protocol.util.CheckTestUtil.checkNotPresentOperational;
20 import static org.opendaylight.protocol.util.CheckTestUtil.readDataOperational;
21
22 import com.google.common.net.InetAddresses;
23 import io.netty.bootstrap.Bootstrap;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.EventLoopGroup;
29 import io.netty.channel.epoll.Epoll;
30 import io.netty.channel.epoll.EpollEventLoopGroup;
31 import io.netty.channel.epoll.EpollSocketChannel;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.socket.SocketChannel;
34 import io.netty.channel.socket.nio.NioSocketChannel;
35 import java.net.InetSocketAddress;
36 import java.util.List;
37 import java.util.concurrent.ExecutionException;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.Mock;
42 import org.mockito.MockitoAnnotations;
43 import org.opendaylight.mdsal.binding.dom.adapter.BindingToNormalizedNodeCodec;
44 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
45 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTestCustomizer;
46 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
48 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
49 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
52 import org.opendaylight.protocol.bgp.inet.RIBActivator;
53 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
54 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
55 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
56 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
57 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
58 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
59 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
60 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
61 import org.opendaylight.protocol.bmp.impl.config.BmpDeployerDependencies;
62 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
63 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
64 import org.opendaylight.protocol.bmp.parser.BmpActivator;
65 import org.opendaylight.protocol.bmp.parser.message.TestUtil;
66 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
67 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
68 import org.opendaylight.protocol.util.CheckUtil;
69 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
70 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev180329.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.PeerId;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.Ipv4AddressFamily;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.UnicastSubsequentAddressFamily;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev180329.AdjRibInType;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev180329.PeerType;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev180329.RouteMirroringMessage;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev180329.StatsReportsMessage;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev180329.stat.Tlvs;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.BmpMonitor;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.MonitorId;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.RouterId;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.Status;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.bmp.monitor.Monitor;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.bmp.monitor.MonitorKey;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.peers.Peer;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.peers.PeerKey;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.peers.peer.Mirrors;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.peers.peer.PeerSession;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.peers.peer.PostPolicyRib;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.peers.peer.PrePolicyRib;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.peers.peer.Stats;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.routers.Router;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev180329.routers.RouterKey;
96 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
97 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
98 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
101 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
102 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
103
104 public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
105     // the local port and address where the monitor (ODL) will listen for incoming BMP request
106     private static final int MONITOR_LOCAL_PORT = 12345;
107     private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
108     private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
109     // the router (monitee) address where we are going to simulate a BMP request from
110     private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
111     private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
112     private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
113     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
114     private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier
115         .create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
116     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
117     private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
118     private BindingToNormalizedNodeCodec mappingService;
119     private RIBActivator ribActivator;
120     private BGPActivator bgpActivator;
121     private BmpActivator bmpActivator;
122     private BmpDispatcher dispatcher;
123     private BmpMonitoringStation bmpApp;
124     private BmpMessageRegistry msgRegistry;
125     private RIBExtensionProviderContext ribExtension;
126     private ClusterSingletonService singletonService;
127     private ClusterSingletonService singletonService2;
128     @Mock
129     private ClusterSingletonServiceRegistration singletonServiceRegistration;
130     @Mock
131     private ClusterSingletonServiceRegistration singletonServiceRegistration2;
132     @Mock
133     private ClusterSingletonServiceProvider clusterSSProv;
134     @Mock
135     private ClusterSingletonServiceProvider clusterSSProv2;
136     private DOMSchemaService schemaService;
137
138     @Before
139     public void setUp() throws Exception {
140         super.setup();
141
142         MockitoAnnotations.initMocks(this);
143
144         doAnswer(invocationOnMock -> {
145             BmpMonitorImplTest.this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
146             this.singletonService.instantiateServiceInstance();
147             return BmpMonitorImplTest.this.singletonServiceRegistration;
148         }).when(this.clusterSSProv).registerClusterSingletonService(any(ClusterSingletonService.class));
149
150         doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService.closeServiceInstance())
151             .when(this.singletonServiceRegistration).close();
152
153         doAnswer(invocationOnMock -> {
154             this.singletonService2 = (ClusterSingletonService) invocationOnMock.getArguments()[0];
155             this.singletonService2.instantiateServiceInstance();
156             return BmpMonitorImplTest.this.singletonServiceRegistration2;
157         }).when(this.clusterSSProv2).registerClusterSingletonService(any(ClusterSingletonService.class));
158
159         doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService2.closeServiceInstance())
160             .when(this.singletonServiceRegistration2).close();
161
162         this.mappingService.onGlobalContextUpdated(this.schemaService.getGlobalContext());
163         this.ribActivator = new RIBActivator();
164         this.ribExtension = new SimpleRIBExtensionProviderContext();
165         this.ribActivator.startRIBExtensionProvider(this.ribExtension, this.mappingService);
166
167         this.bgpActivator = new BGPActivator();
168         final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext();
169         this.bgpActivator.start(context);
170         final SimpleBmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
171         this.bmpActivator = new BmpActivator(context);
172         this.bmpActivator.start(ctx);
173         this.msgRegistry = ctx.getBmpMessageRegistry();
174
175         this.dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
176             ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory());
177
178         final InetSocketAddress inetAddress = new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS),
179             MONITOR_LOCAL_PORT);
180
181         final DOMDataTreeWriteTransaction wTx = getDomBroker().newWriteOnlyTransaction();
182         final ContainerNode parentNode = Builders.containerBuilder().withNodeIdentifier(
183                 new NodeIdentifier(BmpMonitor.QNAME))
184                 .addChild(ImmutableNodes.mapNodeBuilder(Monitor.QNAME).build()).build();
185         wTx.merge(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(BmpMonitor.QNAME), parentNode);
186         wTx.commit().get();
187
188         final BmpDeployerDependencies bmpDependecies = new BmpDeployerDependencies(getDataBroker(), getDomBroker(),
189             this.ribExtension, this.mappingService.getCodecFactory(), this.schemaService.getGlobalContext(),
190             this.clusterSSProv);
191         this.bmpApp = new BmpMonitoringStationImpl(bmpDependecies, this.dispatcher, MONITOR_ID, inetAddress, null);
192         readDataOperational(getDataBroker(), BMP_II, monitor -> {
193             assertEquals(1, monitor.getMonitor().size());
194             final Monitor bmpMonitor = monitor.getMonitor().get(0);
195             assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
196             assertEquals(0, bmpMonitor.getRouter().size());
197             assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
198             assertEquals(0, bmpMonitor.getRouter().size());
199             return monitor;
200         });
201     }
202
203     @Override
204     protected final AbstractDataBrokerTestCustomizer createDataBrokerTestCustomizer() {
205         final AbstractDataBrokerTestCustomizer customizer = super.createDataBrokerTestCustomizer();
206         this.mappingService = customizer.getBindingToNormalized();
207         this.schemaService = customizer.getSchemaService();
208         return customizer;
209     }
210
211     @After
212     public void tearDown() throws Exception {
213         this.ribActivator.close();
214         this.bgpActivator.close();
215         this.bmpActivator.close();
216         this.dispatcher.close();
217         this.bmpApp.close();
218         this.mappingService.close();
219
220         checkNotPresentOperational(getDataBroker(), BMP_II);
221     }
222
223     @Test(timeout = 20000)
224     public void testRouterMonitoring() throws Exception {
225         // first test if a single router monitoring is working
226         final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
227         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
228             assertEquals(1, monitor.getRouter().size());
229             return monitor;
230         });
231
232         final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
233         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
234             assertEquals(2, monitor.getRouter().size());
235             return monitor;
236         });
237
238         // initiate another BMP request from router 1, create a redundant connection
239         // we expect the connection to be closed
240         final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
241
242
243         // channel 1 should still be open, while channel3 should be closed
244         CheckUtil.checkEquals(() -> assertTrue(channel1.isOpen()));
245         CheckUtil.checkEquals(() -> assertFalse(channel3.isOpen()));
246
247         // now if we close the channel 1 and try it again, it should succeed
248         channel1.close().sync();
249
250         // channel 2 is still open
251         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
252             assertEquals(1, monitor.getRouter().size());
253             return monitor;
254         });
255
256         final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
257         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
258             assertEquals(2, monitor.getRouter().size());
259             return monitor;
260         });
261
262         // close all channel altogether
263         channel2.close().sync();
264         Thread.sleep(100);
265
266         // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
267         channel4.close().sync();
268
269         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
270             assertNull(monitor.getRouter());
271             return monitor;
272         });
273     }
274
275     private static void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
276         channelFuture.sync();
277     }
278
279     private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException,
280             ExecutionException {
281         final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
282         final RouterId routerId = getRouterId(remoteRouterIpAddr);
283
284         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
285             assertNotNull(monitor.getRouter());
286             // now find the current router instance
287             Router router = null;
288             for (final Router r : monitor.getRouter()) {
289                 if (routerId.equals(r.getRouterId())) {
290                     router = r;
291                     break;
292                 }
293             }
294             assertNotNull(router);
295             assertEquals(Status.Down, router.getStatus());
296             assertTrue(router.getPeer().isEmpty());
297             return router;
298         });
299
300         waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil
301                 .createInitMsg("description", "name", "some info")));
302
303         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
304             assertNotNull(monitor.getRouter());
305             Router retRouter = null;
306             for (final Router r : monitor.getRouter()) {
307                 if (routerId.equals(r.getRouterId())) {
308                     retRouter = r;
309                     break;
310                 }
311             }
312
313             assertEquals("some info;", retRouter.getInfo());
314             assertEquals("name", retRouter.getName());
315             assertEquals("description", retRouter.getDescription());
316             assertEquals(routerId, retRouter.getRouterId());
317             assertTrue(retRouter.getPeer().isEmpty());
318             assertEquals(Status.Up, retRouter.getStatus());
319             return retRouter;
320         });
321
322         waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
323         final KeyedInstanceIdentifier<Router, RouterKey> routerIId =
324                 MONITOR_IID.child(Router.class, new RouterKey(routerId));
325
326         readDataOperational(getDataBroker(), routerIId, router -> {
327             final List<Peer> peers = router.getPeer();
328             assertNotNull(peers.size());
329             assertEquals(1, peers.size());
330             final Peer peer = peers.get(0);
331             assertEquals(PeerType.Global, peer.getType());
332             assertEquals(PEER_ID, peer.getPeerId());
333             assertEquals(PEER1, peer.getBgpId());
334             assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
335             assertEquals(TestUtil.PEER_AS, peer.getAs());
336             assertNull(peer.getPeerDistinguisher());
337             assertNull(peer.getStats());
338
339             assertNotNull(peer.getPrePolicyRib());
340             assertEquals(1, peer.getPrePolicyRib().getTables().size());
341             final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
342             assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
343             assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
344             assertFalse(prePolicyTable.getAttributes().isUptodate());
345
346             assertNotNull(peer.getPostPolicyRib());
347             assertEquals(1, peer.getPostPolicyRib().getTables().size());
348             final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
349             assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
350             assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
351             assertFalse(postPolicyTable.getAttributes().isUptodate());
352
353             assertNotNull(peer.getPeerSession());
354             final PeerSession peerSession = peer.getPeerSession();
355             assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
356             assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
357             assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
358             assertEquals(Status.Up, peerSession.getStatus());
359             assertNotNull(peerSession.getReceivedOpen());
360             assertNotNull(peerSession.getSentOpen());
361             return router;
362         });
363
364
365         final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
366         waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
367         final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
368
369         readDataOperational(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
370             assertNotNull(peerStats.getTimestampSec());
371             final Tlvs tlvs = statsMsg.getTlvs();
372             assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
373             assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(),
374                     peerStats.getDuplicatePrefixAdvertisements());
375             assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
376             assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
377             assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
378             assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(),
379                     peerStats.getInvalidatedClusterListLoop());
380             assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
381             assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
382             assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
383             assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(),
384                     peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
385             assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(),
386                     peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
387             return peerStats;
388         });
389
390         // route mirror message test
391         final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
392         waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
393
394         readDataOperational(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
395             assertNotNull(routeMirrors.getTimestampSec());
396             return routeMirrors;
397         });
398
399         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonitMsg(false, PEER1,
400                 AdjRibInType.PrePolicy)));
401         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonMsgWithEndOfRibMarker(PEER1,
402                 AdjRibInType.PrePolicy)));
403
404         readDataOperational(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
405             assertTrue(!prePolicyRib.getTables().isEmpty());
406             final Tables tables = prePolicyRib.getTables().get(0);
407             assertTrue(tables.getAttributes().isUptodate());
408             assertEquals(3, ((Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
409             return tables;
410         });
411
412         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonitMsg(false, PEER1,
413                 AdjRibInType.PostPolicy)));
414         waitWriteAndFlushSuccess(channel.writeAndFlush(createRouteMonMsgWithEndOfRibMarker(PEER1,
415                 AdjRibInType.PostPolicy)));
416
417         readDataOperational(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
418             assertTrue(!postPolicyRib.getTables().isEmpty());
419             final Tables tables = postPolicyRib.getTables().get(0);
420             assertTrue(tables.getAttributes().isUptodate());
421             assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet
422                     .rev180329.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase)
423                     tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
424             return tables;
425         });
426
427         waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
428
429         readDataOperational(getDataBroker(), routerIId, router -> {
430             assertNull(router.getPeer());
431             return router;
432         });
433
434         return channel;
435     }
436
437     @Test
438     public void deploySecondInstance() throws Exception {
439         final BmpDeployerDependencies bmpDependecies = new BmpDeployerDependencies(getDataBroker(), getDomBroker(),
440             this.ribExtension, this.mappingService.getCodecFactory(), this.schemaService.getGlobalContext(),
441             this.clusterSSProv2);
442
443         final BmpMonitoringStation monitoringStation2 = new BmpMonitoringStationImpl(bmpDependecies,
444             this.dispatcher, new MonitorId("monitor2"),
445                 new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), null);
446
447         readDataOperational(getDataBroker(), BMP_II, monitor -> {
448             assertEquals(2, monitor.getMonitor().size());
449             return monitor;
450         });
451
452         monitoringStation2.close();
453     }
454
455     private static Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry)
456             throws InterruptedException {
457         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
458         final Bootstrap b = new Bootstrap();
459         final EventLoopGroup workerGroup;
460         if (Epoll.isAvailable()) {
461             b.channel(EpollSocketChannel.class);
462             workerGroup = new EpollEventLoopGroup();
463         } else {
464             b.channel(NioSocketChannel.class);
465             workerGroup = new NioEventLoopGroup();
466         }
467         b.group(workerGroup);
468         b.option(ChannelOption.SO_KEEPALIVE, true);
469         b.handler(new ChannelInitializer<SocketChannel>() {
470             @Override
471             protected void initChannel(final SocketChannel ch) {
472                 ch.pipeline().addLast(hf.getDecoders());
473                 ch.pipeline().addLast(hf.getEncoders());
474             }
475         });
476         b.localAddress(new InetSocketAddress(routerIp, 0));
477         b.option(ChannelOption.SO_REUSEADDR, true);
478         final ChannelFuture future = b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
479         future.sync();
480         return future.channel();
481     }
482
483     private static RouterId getRouterId(final String routerIp) {
484         return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
485     }
486 }