BUG-7003: Remove sleeping from Tests
[bgpcep.git] / bgp / bmp-impl / src / test / java / org / opendaylight / protocol / bmp / impl / app / BmpMonitorImplTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.bmp.impl.app;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.opendaylight.protocol.util.CheckUtil.readData;
18 import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
19
20 import com.google.common.base.Optional;
21 import com.google.common.net.InetAddresses;
22 import io.netty.bootstrap.Bootstrap;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.EventLoopGroup;
28 import io.netty.channel.epoll.Epoll;
29 import io.netty.channel.epoll.EpollEventLoopGroup;
30 import io.netty.channel.epoll.EpollSocketChannel;
31 import io.netty.channel.nio.NioEventLoopGroup;
32 import io.netty.channel.socket.SocketChannel;
33 import io.netty.channel.socket.nio.NioSocketChannel;
34 import java.net.InetSocketAddress;
35 import java.util.List;
36 import javassist.ClassPool;
37 import org.junit.After;
38 import org.junit.Assert;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.MockitoAnnotations;
42 import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
43 import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
44 import org.opendaylight.mdsal.binding.generator.impl.GeneratedClassLoadingStrategy;
45 import org.opendaylight.mdsal.binding.generator.impl.ModuleInfoBackedContext;
46 import org.opendaylight.mdsal.binding.generator.util.JavassistUtils;
47 import org.opendaylight.protocol.bgp.inet.RIBActivator;
48 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
49 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
50 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
51 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
52 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
53 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
54 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
55 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
56 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
57 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
58 import org.opendaylight.protocol.bmp.parser.BmpActivator;
59 import org.opendaylight.protocol.bmp.parser.message.TestUtil;
60 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
61 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
62 import org.opendaylight.protocol.concepts.KeyMapping;
63 import org.opendaylight.protocol.util.CheckUtil;
64 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
65 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.update.attributes.mp.reach.nlri.advertized.routes.destination.type.DestinationIpv4Case;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.attributes.mp.reach.nlri.AdvertizedRoutes;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerId;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.AdjRibInType;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.InitiationMessage;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.PeerType;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.RouteMirroringMessage;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.StatsReportsMessage;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.ReceivedOpen;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.SentOpen;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.stat.Tlvs;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.BmpMonitor;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.MonitorId;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.RouterId;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.Status;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.bmp.monitor.Monitor;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.bmp.monitor.MonitorKey;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.Peer;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.PeerKey;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.Mirrors;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PeerSession;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PostPolicyRib;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PrePolicyRib;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.Stats;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.Router;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.RouterKey;
99 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
100 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
101 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
102 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
103 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
104
105 public class BmpMonitorImplTest extends AbstractDataBrokerTest {
106     // the local port and address where the monitor (ODL) will listen for incoming BMP request
107     private static final int MONITOR_LOCAL_PORT = 12345;
108     private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
109     private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
110     // the router (monitee) address where we are going to simulate a BMP request from
111     private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
112     private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
113     private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
114     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
115     private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
116     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
117     private static final String MD5_PASSWORD = "abcdef";
118     private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
119     private BindingToNormalizedNodeCodec mappingService;
120     private RIBActivator ribActivator;
121     private BGPActivator bgpActivator;
122     private BmpActivator bmpActivator;
123     private BmpDispatcher dispatcher;
124     private BmpMonitoringStation bmpApp;
125     private BmpMessageRegistry msgRegistry;
126     private ModuleInfoBackedContext moduleInfoBackedContext;
127
128
129     @Before
130     public void setUp() throws Exception {
131         MockitoAnnotations.initMocks(this);
132         this.mappingService = new BindingToNormalizedNodeCodec(GeneratedClassLoadingStrategy.getTCCLClassLoadingStrategy(),
133                 new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()))));
134         this.moduleInfoBackedContext = ModuleInfoBackedContext.create();
135         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(InitiationMessage.class));
136         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(CParameters1.class));
137         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(BgpParameters.class));
138         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(MultiprotocolCapability.class));
139         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(DestinationIpv4Case.class));
140         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(AdvertizedRoutes.class));
141         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(SentOpen.class));
142         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(ReceivedOpen.class));
143         this.mappingService.onGlobalContextUpdated(this.moduleInfoBackedContext.tryToCreateSchemaContext().get());
144
145         final KeyMapping keys = KeyMapping.getKeyMapping(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD);
146         this.ribActivator = new RIBActivator();
147         final RIBExtensionProviderContext ribExtension = new SimpleRIBExtensionProviderContext();
148         this.ribActivator.startRIBExtensionProvider(ribExtension);
149
150         this.bgpActivator = new BGPActivator();
151         final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext();
152         this.bgpActivator.start(context);
153         final SimpleBmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
154         this.bmpActivator = new BmpActivator(context);
155         this.bmpActivator.start(ctx);
156         this.msgRegistry = ctx.getBmpMessageRegistry();
157
158         this.dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
159                 ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory());
160
161         this.bmpApp = BmpMonitoringStationImpl.createBmpMonitorInstance(ribExtension, this.dispatcher, getDomBroker(),
162                 MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
163                 this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
164
165         readData(getDataBroker(), BMP_II, monitor -> {
166             Assert.assertEquals(1, monitor.getMonitor().size());
167             final Monitor bmpMonitor = monitor.getMonitor().get(0);
168             Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
169             Assert.assertEquals(0, bmpMonitor.getRouter().size());
170             Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
171             Assert.assertEquals(0, bmpMonitor.getRouter().size());
172             return monitor;
173         });
174     }
175
176     @After
177     public void tearDown() throws Exception {
178         this.ribActivator.close();
179         this.bgpActivator.close();
180         this.bmpActivator.close();
181         this.dispatcher.close();
182         this.bmpApp.close();
183         this.mappingService.close();
184
185         readData(getDataBroker(), BMP_II, monitor -> {
186             assertTrue(monitor.getMonitor().isEmpty());
187             return monitor;
188         });
189     }
190
191     @Test
192     public void testRouterMonitoring() throws Exception {
193         // first test if a single router monitoring is working
194         final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
195         readData(getDataBroker(), MONITOR_IID, monitor -> {
196             assertEquals(1, monitor.getRouter().size());
197             return monitor;
198         });
199
200         final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
201         readData(getDataBroker(), MONITOR_IID, monitor -> {
202             assertEquals(2, monitor.getRouter().size());
203             return monitor;
204         });
205
206         // initiate another BMP request from router 1, create a redundant connection
207         // we expect the connection to be closed
208         final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
209
210
211         // channel 1 should still be open, while channel3 should be closed
212         CheckUtil.checkEquals(()-> assertTrue(channel1.isOpen()));
213         CheckUtil.checkEquals(()-> assertFalse(channel3.isOpen()));
214
215         // now if we close the channel 1 and try it again, it should succeed
216         waitFutureSuccess(channel1.close());
217
218         // channel 2 is still open
219         readData(getDataBroker(), MONITOR_IID, monitor -> {
220             assertEquals(1, monitor.getRouter().size());
221             return monitor;
222         });
223
224         final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
225         readData(getDataBroker(), MONITOR_IID, monitor -> {
226             assertEquals(2, monitor.getRouter().size());
227             return monitor;
228         });
229
230         // close all channel altogether
231         waitFutureSuccess(channel2.close());
232         Thread.sleep(100);
233
234         // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
235         waitFutureSuccess(channel4.close());
236
237         readData(getDataBroker(), MONITOR_IID, monitor -> {
238             assertEquals(0, monitor.getRouter().size());
239             return monitor;
240         });
241     }
242
243     private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
244         waitFutureSuccess(channelFuture);
245     }
246
247     private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException {
248         final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
249         final RouterId routerId = getRouterId(remoteRouterIpAddr);
250         try {
251             readData(getDataBroker(), MONITOR_IID, monitor -> {
252                 assertFalse(monitor.getRouter().isEmpty());
253                 // now find the current router instance
254                 Router router = null;
255                 for (final Router r : monitor.getRouter()) {
256                     if (routerId.equals(r.getRouterId())) {
257                         router = r;
258                         break;
259                     }
260                 }
261                 assertNotNull(router);
262                 assertEquals(Status.Down, router.getStatus());
263                 assertTrue(router.getPeer().isEmpty());
264                 return router;
265             });
266
267             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
268
269             readData(getDataBroker(), MONITOR_IID, monitor -> {
270                 assertFalse(monitor.getRouter().isEmpty());
271                 Router retRouter = null;
272                 for (final Router r : monitor.getRouter()) {
273                     if (routerId.equals(r.getRouterId())) {
274                         retRouter = r;
275                         break;
276                     }
277                 }
278
279                 assertEquals("some info;", retRouter.getInfo());
280                 assertEquals("name", retRouter.getName());
281                 assertEquals("description", retRouter.getDescription());
282                 assertEquals(routerId, retRouter.getRouterId());
283                 assertTrue(retRouter.getPeer().isEmpty());
284                 assertEquals(Status.Up, retRouter.getStatus());
285                 return retRouter;
286             });
287
288             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
289             final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
290
291             readData(getDataBroker(), routerIId, router -> {
292                 final List<Peer> peers = router.getPeer();
293                 assertEquals(1, peers.size());
294                 final Peer peer = peers.get(0);
295                 assertEquals(PeerType.Global, peer.getType());
296                 assertEquals(PEER_ID, peer.getPeerId());
297                 assertEquals(PEER1, peer.getBgpId());
298                 assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
299                 assertEquals(TestUtil.PEER_AS, peer.getAs());
300                 assertNull(peer.getDistinguisher());
301                 assertNull(peer.getStats());
302
303                 assertNotNull(peer.getPrePolicyRib());
304                 assertEquals(1, peer.getPrePolicyRib().getTables().size());
305                 final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
306                 assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
307                 assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
308                 assertFalse(prePolicyTable.getAttributes().isUptodate());
309                 assertNotNull(prePolicyTable.getRoutes());
310
311                 assertNotNull(peer.getPostPolicyRib());
312                 assertEquals(1, peer.getPostPolicyRib().getTables().size());
313                 final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
314                 assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
315                 assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
316                 assertFalse(postPolicyTable.getAttributes().isUptodate());
317                 assertNotNull(postPolicyTable.getRoutes());
318
319                 assertNotNull(peer.getPeerSession());
320                 final PeerSession peerSession = peer.getPeerSession();
321                 assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
322                 assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
323                 assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
324                 assertEquals(Status.Up, peerSession.getStatus());
325                 assertNotNull(peerSession.getReceivedOpen());
326                 assertNotNull(peerSession.getSentOpen());
327                 return router;
328             });
329
330
331             final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
332             waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
333             final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
334
335             readData(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
336                 assertNotNull(peerStats.getTimestampSec());
337                 final Tlvs tlvs = statsMsg.getTlvs();
338                 assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
339                 assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
340                 assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
341                 assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
342                 assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
343                 assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
344                 assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
345                 assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
346                 assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
347                 assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
348                 assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
349                 return peerStats;
350             });
351
352             // route mirror message test
353             final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
354             waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
355
356             readData(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
357                 assertNotNull(routeMirrors.getTimestampSec());
358                 return routeMirrors;
359             });
360
361             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
362             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
363
364             readData(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
365                 assertTrue(!prePolicyRib.getTables().isEmpty());
366                 final Tables tables = prePolicyRib.getTables().get(0);
367                 assertTrue(tables.getAttributes().isUptodate());
368                 assertEquals(3, ((Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
369                 return tables;
370             });
371
372             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
373             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
374
375             readData(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
376                 assertTrue(!postPolicyRib.getTables().isEmpty());
377                 final Tables tables = postPolicyRib.getTables().get(0);
378                 assertTrue(tables.getAttributes().isUptodate());
379                 assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase)
380                         tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
381                 return tables;
382             });
383
384             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
385
386             readData(getDataBroker(), routerIId, router -> {
387                 final List<Peer> peersAfterDown = router.getPeer();
388                 assertTrue(peersAfterDown.isEmpty());
389                 return router;
390             });
391         } catch (final Exception e) {
392             final StringBuffer ex = new StringBuffer();
393             ex.append(e.getMessage()).append("\n");
394             for (final StackTraceElement element : e.getStackTrace()) {
395                 ex.append(element.toString() + "\n");
396             }
397             fail(ex.toString());
398         }
399         return channel;
400     }
401
402     @Test
403     public void deploySecondInstance() throws Exception {
404         final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
405                 new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(KeyMapping.getKeyMapping()),
406                 this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
407
408         readData(getDataBroker(), BMP_II, monitor -> {
409             Assert.assertEquals(2, monitor.getMonitor().size());
410             return monitor;
411         });
412
413         monitoringStation2.close();
414     }
415
416     private Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry) throws InterruptedException {
417         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
418         final Bootstrap b = new Bootstrap();
419         final EventLoopGroup workerGroup;
420         if(Epoll.isAvailable()){
421             b.channel(EpollSocketChannel.class);
422             workerGroup =new EpollEventLoopGroup();
423         } else {
424             b.channel(NioSocketChannel.class);
425             workerGroup = new NioEventLoopGroup();
426         }
427         b.group(workerGroup);
428         b.option(ChannelOption.SO_KEEPALIVE, true);
429         b.handler(new ChannelInitializer<SocketChannel>() {
430             @Override
431             protected void initChannel(final SocketChannel ch) throws Exception {
432                 ch.pipeline().addLast(hf.getDecoders());
433                 ch.pipeline().addLast(hf.getEncoders());
434             }
435         });
436         b.localAddress(new InetSocketAddress(routerIp, 0));
437         b.option(ChannelOption.SO_REUSEADDR, true);
438         final ChannelFuture future = b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
439         waitFutureSuccess(future);
440         return future.channel();
441     }
442
443     private RouterId getRouterId(final String routerIp) {
444         return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
445     }
446 }