BUG-6120: Fix occasionally failure of test
[bgpcep.git] / bgp / bmp-impl / src / test / java / org / opendaylight / protocol / bmp / impl / app / BmpMonitorImplTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.bmp.impl.app;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17
18 import com.google.common.base.Charsets;
19 import com.google.common.base.Optional;
20 import com.google.common.net.InetAddresses;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import io.netty.bootstrap.Bootstrap;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.socket.SocketChannel;
29 import io.netty.util.concurrent.Future;
30 import java.net.InetSocketAddress;
31 import java.util.List;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import javassist.ClassPool;
35 import org.junit.After;
36 import org.junit.Assert;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.mockito.Mock;
40 import org.mockito.Mockito;
41 import org.mockito.MockitoAnnotations;
42 import org.opendaylight.controller.config.yang.bmp.impl.MonitoredRouter;
43 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
44 import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
45 import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
46 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
47 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
48 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
49 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
50 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
51 import org.opendaylight.protocol.bgp.rib.impl.RIBActivator;
52 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
53 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
54 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
55 import org.opendaylight.protocol.bmp.impl.BmpActivator;
56 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
57 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
58 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
59 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
60 import org.opendaylight.protocol.bmp.impl.test.TestUtil;
61 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
62 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
63 import org.opendaylight.tcpmd5.api.KeyAccess;
64 import org.opendaylight.tcpmd5.api.KeyAccessFactory;
65 import org.opendaylight.tcpmd5.api.KeyMapping;
66 import org.opendaylight.tcpmd5.netty.MD5ChannelFactory;
67 import org.opendaylight.tcpmd5.netty.MD5NioServerSocketChannelFactory;
68 import org.opendaylight.tcpmd5.netty.MD5NioSocketChannelFactory;
69 import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory;
70 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
71 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.update.attributes.mp.reach.nlri.advertized.routes.destination.type.DestinationIpv4Case;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.attributes.mp.reach.nlri.AdvertizedRoutes;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerId;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.AdjRibInType;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.InitiationMessage;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.PeerType;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.RouteMirroringMessage;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.StatsReportsMessage;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.ReceivedOpen;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.SentOpen;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.stat.Tlvs;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.BmpMonitor;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.MonitorId;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.RouterId;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.Status;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.bmp.monitor.Monitor;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.bmp.monitor.MonitorKey;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.Peer;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.PeerKey;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.Mirrors;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PeerSession;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PostPolicyRib;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PrePolicyRib;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.Stats;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.Router;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.RouterKey;
105 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
106 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
107 import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
108 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
109 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
110 import org.opendaylight.yangtools.yang.binding.DataObject;
111 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
112 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
113 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
114
115 public class BmpMonitorImplTest extends AbstractDataBrokerTest {
116     // the local port and address where the monitor (ODL) will listen for incoming BMP request
117     private static final int MONITOR_LOCAL_PORT = 12345;
118     private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
119     private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
120     // the router (monitee) address where we are going to simulate a BMP request from
121     private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
122     private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
123     private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
124     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
125     private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
126     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
127     private static final String MD5_PASSWORD = "abcdef";
128
129     private BindingToNormalizedNodeCodec mappingService;
130     private RIBActivator ribActivator;
131     private BGPActivator bgpActivator;
132     private BmpActivator bmpActivator;
133     private BmpDispatcher dispatcher;
134     private BmpMonitoringStation bmpApp;
135     private BmpMessageRegistry msgRegistry;
136     private MD5NioSocketChannelFactory scfMd5;
137     private List<MonitoredRouter> mrs;
138     private ModuleInfoBackedContext moduleInfoBackedContext;
139
140     @Mock private KeyAccess mockKeyAccess;
141     @Mock private KeyAccessFactory kaf;
142
143     @Before
144     public void setUp() throws Exception {
145         MockitoAnnotations.initMocks(this);
146         this.mappingService = new BindingToNormalizedNodeCodec(GeneratedClassLoadingStrategy.getTCCLClassLoadingStrategy(),
147                 new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()))));
148         this.moduleInfoBackedContext = ModuleInfoBackedContext.create();
149         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(InitiationMessage.class));
150         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(CParameters1.class));
151         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(BgpParameters.class));
152         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(MultiprotocolCapability.class));
153         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(DestinationIpv4Case.class));
154         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(AdvertizedRoutes.class));
155         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(SentOpen.class));
156         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(ReceivedOpen.class));
157         this.mappingService.onGlobalContextUpdated(this.moduleInfoBackedContext.tryToCreateSchemaContext().get());
158
159         final KeyMapping keys = new KeyMapping();
160         keys.put(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD.getBytes(Charsets.US_ASCII));
161
162         Mockito.doReturn(this.mockKeyAccess).when(this.kaf).getKeyAccess(Mockito.any(java.nio.channels.Channel.class));
163         Mockito.doReturn(keys).when(this.mockKeyAccess).getKeys();
164         Mockito.doNothing().when(this.mockKeyAccess).setKeys(Mockito.any(KeyMapping.class));
165
166         final MD5NioServerSocketChannelFactory scfServerMd5 = new MD5NioServerSocketChannelFactory(this.kaf);
167         this.scfMd5 = new MD5NioSocketChannelFactory(this.kaf);
168
169         this.ribActivator = new RIBActivator();
170         final RIBExtensionProviderContext ribExtension = new SimpleRIBExtensionProviderContext();
171         this.ribActivator.startRIBExtensionProvider(ribExtension);
172
173         this.bgpActivator = new BGPActivator();
174         final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext();
175         this.bgpActivator.start(context);
176         final SimpleBmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
177         this.bmpActivator = new BmpActivator(context);
178         this.bmpActivator.start(ctx);
179         this.msgRegistry = ctx.getBmpMessageRegistry();
180
181         this.dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
182                 ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory(), Optional.<MD5ChannelFactory<?>>of(this.scfMd5),
183                 Optional.<MD5ServerChannelFactory<?>>of(scfServerMd5));
184
185         this.bmpApp = BmpMonitoringStationImpl.createBmpMonitorInstance(ribExtension, this.dispatcher, getDomBroker(),
186                 MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
187                 this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
188
189         final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
190         Assert.assertEquals(1, monitor.getMonitor().size());
191         final Monitor bmpMonitor = monitor.getMonitor().get(0);
192         Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
193         Assert.assertEquals(0, bmpMonitor.getRouter().size());
194     }
195
196     @After
197     public void tearDown() throws Exception {
198         this.ribActivator.close();
199         this.bgpActivator.close();
200         this.bmpActivator.close();
201         this.dispatcher.close();
202         this.bmpApp.close();
203         this.mappingService.close();
204         final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
205         assertTrue(monitor.getMonitor().isEmpty());
206     }
207
208     @Test
209     public void testRouterMonitoring() throws Exception {
210         // first test if a single router monitoring is working
211         final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
212         assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
213
214         final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
215         assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
216
217         // initiate another BMP request from router 1, create a redundant connection
218         // we expect the connection to be closed
219         final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
220
221         // channel 1 should still be open, while channel3 should be closed
222         assertTrue(channel1.isOpen());
223         assertFalse(channel3.isOpen());
224         // now if we close the channel 1 and try it again, it should succeed
225         waitFutureSuccess(channel1.close());
226         // channel 2 is still open
227         assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
228
229         Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
230         assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
231
232         // close all channel altogether
233         waitFutureSuccess(channel2.close());
234         // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
235         waitFutureSuccess(channel4.close());
236         assertEquals(0, getBmpData(MONITOR_IID).get().getRouter().size());
237     }
238
239     private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
240         final CountDownLatch latch = new CountDownLatch(1);
241         future.addListener(future1 -> latch.countDown());
242         Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
243         Thread.sleep(500);
244     }
245
246     private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
247         waitFutureSuccess(channelFuture);
248         Thread.sleep(500);
249     }
250
251     private Channel testMonitoringStation(String remoteRouterIpAddr) throws InterruptedException {
252         final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
253         final RouterId routerId = getRouterId(remoteRouterIpAddr);
254         try {
255             Thread.sleep(500);
256
257             final Monitor monitor = getBmpData(MONITOR_IID).get();
258             assertFalse(monitor.getRouter().isEmpty());
259             // now find the current router instance
260             Router router = null;
261             for (Router r : monitor.getRouter()) {
262                 if (routerId.equals(r.getRouterId())) {
263                     router = r;
264                     break;
265                 }
266             }
267             assertNotNull(router);
268             assertEquals(Status.Down, router.getStatus());
269             assertTrue(router.getPeer().isEmpty());
270
271             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
272             final Monitor monitorInit = getBmpData(MONITOR_IID).get();
273             assertFalse(monitorInit.getRouter().isEmpty());
274             Router routerInit = null;
275             for (Router r : monitorInit.getRouter()) {
276                 if (routerId.equals(r.getRouterId())) {
277                     routerInit = r;
278                     break;
279                 }
280             }
281             assertNotNull(routerInit);
282             assertEquals("some info;", routerInit.getInfo());
283             assertEquals("name", routerInit.getName());
284             assertEquals("description", routerInit.getDescription());
285             assertEquals(routerId, routerInit.getRouterId());
286             assertTrue(routerInit.getPeer().isEmpty());
287             assertEquals(Status.Up, routerInit.getStatus());
288
289             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
290             final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
291             final List<Peer> peers = getBmpData(routerIId).get().getPeer();
292             assertEquals(1, peers.size());
293             final Peer peer = peers.get(0);
294             assertEquals(PeerType.Global, peer.getType());
295             assertEquals(PEER_ID, peer.getPeerId());
296             assertEquals(PEER1, peer.getBgpId());
297             assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
298             assertEquals(TestUtil.PEER_AS, peer.getAs());
299             assertNull(peer.getDistinguisher());
300             assertNull(peer.getStats());
301
302             assertNotNull(peer.getPrePolicyRib());
303             assertEquals(1, peer.getPrePolicyRib().getTables().size());
304             final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
305             assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
306             assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
307             assertFalse(prePolicyTable.getAttributes().isUptodate());
308             assertNotNull(prePolicyTable.getRoutes());
309
310             assertNotNull(peer.getPostPolicyRib());
311             assertEquals(1, peer.getPostPolicyRib().getTables().size());
312             final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
313             assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
314             assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
315             assertFalse(postPolicyTable.getAttributes().isUptodate());
316             assertNotNull(postPolicyTable.getRoutes());
317
318             assertNotNull(peer.getPeerSession());
319             final PeerSession peerSession = peer.getPeerSession();
320             assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
321             assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
322             assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
323             assertEquals(Status.Up, peerSession.getStatus());
324             assertNotNull(peerSession.getReceivedOpen());
325             assertNotNull(peerSession.getSentOpen());
326
327             final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
328             waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
329             final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
330             final Stats peerStats = getBmpData(peerIId.child(Stats.class)).get();
331             assertNotNull(peerStats.getTimestampSec());
332             final Tlvs tlvs = statsMsg.getTlvs();
333             assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
334             assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
335             assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
336             assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
337             assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
338             assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
339             assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
340             assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
341             assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
342             assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
343             assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
344
345             // route mirror message test
346             final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
347             waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
348             final Mirrors routeMirrors = getBmpData(peerIId.child(Mirrors.class)).get();
349             assertNotNull(routeMirrors.getTimestampSec());
350
351             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
352             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
353             final Tables prePolicyRib = getBmpData(peerIId.child(PrePolicyRib.class)).get().getTables().get(0);
354             assertTrue(prePolicyRib.getAttributes().isUptodate());
355             assertEquals(3, ((Ipv4RoutesCase) prePolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
356
357             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
358             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
359             final Tables postPolicyRib = getBmpData(peerIId.child(PostPolicyRib.class)).get().getTables().get(0);
360             assertTrue(postPolicyRib.getAttributes().isUptodate());
361             assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase) postPolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
362
363             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
364             final List<Peer> peersAfterDown = getBmpData(routerIId).get().getPeer();
365             assertTrue(peersAfterDown.isEmpty());
366         } catch (final Exception e) {
367             final StringBuffer ex = new StringBuffer();
368             ex.append(e.getMessage()).append("\n");
369             for (final StackTraceElement element : e.getStackTrace()) {
370                 ex.append(element.toString() + "\n");
371             }
372             fail(ex.toString());
373         }
374         return channel;
375     }
376
377     @Test
378     public void deploySecondInstance() throws Exception {
379         final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
380                 new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(new KeyMapping()),
381                 this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
382         final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
383         Assert.assertEquals(2, monitor.getMonitor().size());
384         monitoringStation2.close();
385     }
386
387     private Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry) throws InterruptedException {
388         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
389         final Bootstrap b = new Bootstrap();
390         b.group(new NioEventLoopGroup());
391         b.option(ChannelOption.SO_KEEPALIVE, true);
392         b.channelFactory(this.scfMd5);
393         b.handler(new ChannelInitializer<SocketChannel>() {
394             @Override
395             protected void initChannel(final SocketChannel ch) throws Exception {
396                 ch.pipeline().addLast(hf.getDecoders());
397                 ch.pipeline().addLast(hf.getEncoders());
398             }
399         });
400         b.localAddress(new InetSocketAddress(routerIp, 0));
401         b.option(ChannelOption.SO_REUSEADDR, true);
402         final ChannelFuture future = b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
403         waitFutureSuccess(future);
404         return future.channel();
405     }
406
407     private <T extends DataObject> Optional<T> getBmpData(final InstanceIdentifier<T> iid) throws ReadFailedException {
408         try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
409             return tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
410         }
411     }
412
413     private RouterId getRouterId(String routerIp) {
414         return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
415     }
416 }