BGPCEP-706: Fix BGP Flowspec NumbericOphrand
[bgpcep.git] / bgp / bmp-impl / src / test / java / org / opendaylight / protocol / bmp / impl / app / BmpMonitorImplTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.bmp.impl.app;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doAnswer;
19 import static org.opendaylight.protocol.util.CheckUtil.readDataOperational;
20 import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
21
22 import com.google.common.net.InetAddresses;
23 import io.netty.bootstrap.Bootstrap;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.EventLoopGroup;
29 import io.netty.channel.epoll.Epoll;
30 import io.netty.channel.epoll.EpollEventLoopGroup;
31 import io.netty.channel.epoll.EpollSocketChannel;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.socket.SocketChannel;
34 import io.netty.channel.socket.nio.NioSocketChannel;
35 import java.net.InetSocketAddress;
36 import java.util.List;
37 import javassist.ClassPool;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.Mock;
42 import org.mockito.MockitoAnnotations;
43 import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
44 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
45 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
46 import org.opendaylight.mdsal.binding.dom.adapter.BindingToNormalizedNodeCodec;
47 import org.opendaylight.mdsal.binding.dom.codec.gen.impl.StreamWriterGenerator;
48 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingNormalizedNodeCodecRegistry;
49 import org.opendaylight.mdsal.binding.generator.impl.GeneratedClassLoadingStrategy;
50 import org.opendaylight.mdsal.binding.generator.impl.ModuleInfoBackedContext;
51 import org.opendaylight.mdsal.binding.generator.util.JavassistUtils;
52 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
53 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
54 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
55 import org.opendaylight.protocol.bgp.inet.RIBActivator;
56 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
57 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
58 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
59 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
60 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
61 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
62 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
63 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
64 import org.opendaylight.protocol.bmp.impl.config.BmpDeployerDependencies;
65 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
66 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
67 import org.opendaylight.protocol.bmp.parser.BmpActivator;
68 import org.opendaylight.protocol.bmp.parser.message.TestUtil;
69 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
70 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
71 import org.opendaylight.protocol.util.CheckUtil;
72 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
73 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.update.attributes.mp.reach.nlri.advertized.routes.destination.type.DestinationIpv4Case;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.attributes.mp.reach.nlri.AdvertizedRoutes;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerId;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.AdjRibInType;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.InitiationMessage;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.PeerType;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.RouteMirroringMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.StatsReportsMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.ReceivedOpen;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.SentOpen;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.stat.Tlvs;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.BmpMonitor;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.MonitorId;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.RouterId;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.Status;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.bmp.monitor.Monitor;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.bmp.monitor.MonitorKey;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.Peer;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.PeerKey;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.Mirrors;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PeerSession;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PostPolicyRib;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PrePolicyRib;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.Stats;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.Router;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.RouterKey;
107 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
108 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
112 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
113 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115
116 public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
117     // the local port and address where the monitor (ODL) will listen for incoming BMP request
118     private static final int MONITOR_LOCAL_PORT = 12345;
119     private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
120     private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
121     // the router (monitee) address where we are going to simulate a BMP request from
122     private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
123     private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
124     private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
125     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
126     private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier
127         .create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
128     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
129     private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
130     private BindingToNormalizedNodeCodec mappingService;
131     private RIBActivator ribActivator;
132     private BGPActivator bgpActivator;
133     private BmpActivator bmpActivator;
134     private BmpDispatcher dispatcher;
135     private BmpMonitoringStation bmpApp;
136     private BmpMessageRegistry msgRegistry;
137     private RIBExtensionProviderContext ribExtension;
138     private ClusterSingletonService singletonService;
139     private ClusterSingletonService singletonService2;
140     @Mock
141     private ClusterSingletonServiceRegistration singletonServiceRegistration;
142     @Mock
143     private ClusterSingletonServiceRegistration singletonServiceRegistration2;
144     @Mock
145     private ClusterSingletonServiceProvider clusterSSProv;
146     @Mock
147     private ClusterSingletonServiceProvider clusterSSProv2;
148     @Before
149     public void setUp() throws Exception {
150         MockitoAnnotations.initMocks(this);
151
152         doAnswer(invocationOnMock -> {
153             BmpMonitorImplTest.this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
154             this.singletonService.instantiateServiceInstance();
155             return BmpMonitorImplTest.this.singletonServiceRegistration;
156         }).when(this.clusterSSProv).registerClusterSingletonService(any(ClusterSingletonService.class));
157
158         doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService.closeServiceInstance())
159             .when(this.singletonServiceRegistration).close();
160
161         doAnswer(invocationOnMock -> {
162             this.singletonService2 = (ClusterSingletonService) invocationOnMock.getArguments()[0];
163             this.singletonService2.instantiateServiceInstance();
164             return BmpMonitorImplTest.this.singletonServiceRegistration2;
165         }).when(this.clusterSSProv2).registerClusterSingletonService(any(ClusterSingletonService.class));
166
167         doAnswer(invocationOnMock -> BmpMonitorImplTest.this.singletonService2.closeServiceInstance())
168             .when(this.singletonServiceRegistration2).close();
169
170         this.mappingService = new BindingToNormalizedNodeCodec(GeneratedClassLoadingStrategy.getTCCLClassLoadingStrategy(),
171             new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()))));
172         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
173         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(InitiationMessage.class));
174         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(CParameters1.class));
175         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(BgpParameters.class));
176         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(MultiprotocolCapability.class));
177         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(DestinationIpv4Case.class));
178         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(AdvertizedRoutes.class));
179         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(SentOpen.class));
180         moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(ReceivedOpen.class));
181         this.mappingService.onGlobalContextUpdated(moduleInfoBackedContext.tryToCreateSchemaContext().get());
182         this.ribActivator = new RIBActivator();
183         this.ribExtension = new SimpleRIBExtensionProviderContext();
184         this.ribActivator.startRIBExtensionProvider(this.ribExtension);
185
186         this.bgpActivator = new BGPActivator();
187         final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext();
188         this.bgpActivator.start(context);
189         final SimpleBmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
190         this.bmpActivator = new BmpActivator(context);
191         this.bmpActivator.start(ctx);
192         this.msgRegistry = ctx.getBmpMessageRegistry();
193
194         this.dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
195             ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory());
196
197         final InetSocketAddress inetAddress = new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS),
198             MONITOR_LOCAL_PORT);
199
200         final DOMDataWriteTransaction wTx = getDomBroker().newWriteOnlyTransaction();
201         final ContainerNode parentNode = Builders.containerBuilder().withNodeIdentifier(
202             new NodeIdentifier(BmpMonitor.QNAME)).addChild(ImmutableNodes.mapNodeBuilder(Monitor.QNAME).build()).build();
203         wTx.merge(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(BmpMonitor.QNAME), parentNode);
204         wTx.submit();
205
206         final BmpDeployerDependencies bmpDependecies = new BmpDeployerDependencies(getDataBroker(), getDomBroker(),
207             this.ribExtension, this.mappingService.getCodecFactory(), getSchemaContext(), this.clusterSSProv);
208         this.bmpApp = new BmpMonitoringStationImpl(bmpDependecies, this.dispatcher, MONITOR_ID, inetAddress, null);
209         readDataOperational(getDataBroker(), BMP_II, monitor -> {
210             assertEquals(1, monitor.getMonitor().size());
211             final Monitor bmpMonitor = monitor.getMonitor().get(0);
212             assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
213             assertEquals(0, bmpMonitor.getRouter().size());
214             assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
215             assertEquals(0, bmpMonitor.getRouter().size());
216             return monitor;
217         });
218     }
219
220     @After
221     public void tearDown() throws Exception {
222         this.ribActivator.close();
223         this.bgpActivator.close();
224         this.bmpActivator.close();
225         this.dispatcher.close();
226         this.bmpApp.close();
227         this.mappingService.close();
228
229         readDataOperational(getDataBroker(), BMP_II, monitor -> {
230             assertTrue(monitor.getMonitor().isEmpty());
231             return monitor;
232         });
233     }
234
235     @Test
236     public void testRouterMonitoring() throws Exception {
237         // first test if a single router monitoring is working
238         final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
239         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
240             assertEquals(1, monitor.getRouter().size());
241             return monitor;
242         });
243
244         final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
245         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
246             assertEquals(2, monitor.getRouter().size());
247             return monitor;
248         });
249
250         // initiate another BMP request from router 1, create a redundant connection
251         // we expect the connection to be closed
252         final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
253
254
255         // channel 1 should still be open, while channel3 should be closed
256         CheckUtil.checkEquals(() -> assertTrue(channel1.isOpen()));
257         CheckUtil.checkEquals(() -> assertFalse(channel3.isOpen()));
258
259         // now if we close the channel 1 and try it again, it should succeed
260         waitFutureSuccess(channel1.close());
261
262         // channel 2 is still open
263         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
264             assertEquals(1, monitor.getRouter().size());
265             return monitor;
266         });
267
268         final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
269         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
270             assertEquals(2, monitor.getRouter().size());
271             return monitor;
272         });
273
274         // close all channel altogether
275         waitFutureSuccess(channel2.close());
276         Thread.sleep(100);
277
278         // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
279         waitFutureSuccess(channel4.close());
280
281         readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
282             assertEquals(0, monitor.getRouter().size());
283             return monitor;
284         });
285     }
286
287     private static void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) {
288         waitFutureSuccess(channelFuture);
289     }
290
291     private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException {
292         final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
293         final RouterId routerId = getRouterId(remoteRouterIpAddr);
294         try {
295             readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
296                 assertFalse(monitor.getRouter().isEmpty());
297                 // now find the current router instance
298                 Router router = null;
299                 for (final Router r : monitor.getRouter()) {
300                     if (routerId.equals(r.getRouterId())) {
301                         router = r;
302                         break;
303                     }
304                 }
305                 assertNotNull(router);
306                 assertEquals(Status.Down, router.getStatus());
307                 assertTrue(router.getPeer().isEmpty());
308                 return router;
309             });
310
311             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
312
313             readDataOperational(getDataBroker(), MONITOR_IID, monitor -> {
314                 assertFalse(monitor.getRouter().isEmpty());
315                 Router retRouter = null;
316                 for (final Router r : monitor.getRouter()) {
317                     if (routerId.equals(r.getRouterId())) {
318                         retRouter = r;
319                         break;
320                     }
321                 }
322
323                 assertEquals("some info;", retRouter.getInfo());
324                 assertEquals("name", retRouter.getName());
325                 assertEquals("description", retRouter.getDescription());
326                 assertEquals(routerId, retRouter.getRouterId());
327                 assertTrue(retRouter.getPeer().isEmpty());
328                 assertEquals(Status.Up, retRouter.getStatus());
329                 return retRouter;
330             });
331
332             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
333             final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
334
335             readDataOperational(getDataBroker(), routerIId, router -> {
336                 final List<Peer> peers = router.getPeer();
337                 assertEquals(1, peers.size());
338                 final Peer peer = peers.get(0);
339                 assertEquals(PeerType.Global, peer.getType());
340                 assertEquals(PEER_ID, peer.getPeerId());
341                 assertEquals(PEER1, peer.getBgpId());
342                 assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
343                 assertEquals(TestUtil.PEER_AS, peer.getAs());
344                 assertNull(peer.getPeerDistinguisher());
345                 assertNull(peer.getStats());
346
347                 assertNotNull(peer.getPrePolicyRib());
348                 assertEquals(1, peer.getPrePolicyRib().getTables().size());
349                 final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
350                 assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
351                 assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
352                 assertFalse(prePolicyTable.getAttributes().isUptodate());
353                 assertNotNull(prePolicyTable.getRoutes());
354
355                 assertNotNull(peer.getPostPolicyRib());
356                 assertEquals(1, peer.getPostPolicyRib().getTables().size());
357                 final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
358                 assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
359                 assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
360                 assertFalse(postPolicyTable.getAttributes().isUptodate());
361                 assertNotNull(postPolicyTable.getRoutes());
362
363                 assertNotNull(peer.getPeerSession());
364                 final PeerSession peerSession = peer.getPeerSession();
365                 assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
366                 assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
367                 assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
368                 assertEquals(Status.Up, peerSession.getStatus());
369                 assertNotNull(peerSession.getReceivedOpen());
370                 assertNotNull(peerSession.getSentOpen());
371                 return router;
372             });
373
374
375             final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
376             waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
377             final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
378
379             readDataOperational(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
380                 assertNotNull(peerStats.getTimestampSec());
381                 final Tlvs tlvs = statsMsg.getTlvs();
382                 assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
383                 assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
384                 assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
385                 assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
386                 assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
387                 assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
388                 assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
389                 assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
390                 assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
391                 assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
392                 assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
393                 return peerStats;
394             });
395
396             // route mirror message test
397             final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
398             waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
399
400             readDataOperational(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
401                 assertNotNull(routeMirrors.getTimestampSec());
402                 return routeMirrors;
403             });
404
405             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
406             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
407
408             readDataOperational(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
409                 assertTrue(!prePolicyRib.getTables().isEmpty());
410                 final Tables tables = prePolicyRib.getTables().get(0);
411                 assertTrue(tables.getAttributes().isUptodate());
412                 assertEquals(3, ((Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
413                 return tables;
414             });
415
416             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
417             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
418
419             readDataOperational(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
420                 assertTrue(!postPolicyRib.getTables().isEmpty());
421                 final Tables tables = postPolicyRib.getTables().get(0);
422                 assertTrue(tables.getAttributes().isUptodate());
423                 assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase)
424                     tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
425                 return tables;
426             });
427
428             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
429
430             readDataOperational(getDataBroker(), routerIId, router -> {
431                 final List<Peer> peersAfterDown = router.getPeer();
432                 assertTrue(peersAfterDown.isEmpty());
433                 return router;
434             });
435         } catch (final Exception e) {
436             final StringBuilder ex = new StringBuilder();
437             ex.append(e.getMessage()).append("\n");
438             for (final StackTraceElement element : e.getStackTrace()) {
439                 ex.append(element.toString()).append("\n");
440             }
441             fail(ex.toString());
442         }
443         return channel;
444     }
445
446     @Test
447     public void deploySecondInstance() throws Exception {
448         final BmpDeployerDependencies bmpDependecies = new BmpDeployerDependencies(getDataBroker(), getDomBroker(),
449             this.ribExtension, this.mappingService.getCodecFactory(), getSchemaContext(), this.clusterSSProv2);
450
451         final BmpMonitoringStation monitoringStation2 = new BmpMonitoringStationImpl(bmpDependecies,
452             this.dispatcher, new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.
453             forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), null);
454
455         readDataOperational(getDataBroker(), BMP_II, monitor -> {
456             assertEquals(2, monitor.getMonitor().size());
457             return monitor;
458         });
459
460         monitoringStation2.close();
461     }
462
463     private static Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry)
464             throws InterruptedException {
465         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
466         final Bootstrap b = new Bootstrap();
467         final EventLoopGroup workerGroup;
468         if (Epoll.isAvailable()) {
469             b.channel(EpollSocketChannel.class);
470             workerGroup = new EpollEventLoopGroup();
471         } else {
472             b.channel(NioSocketChannel.class);
473             workerGroup = new NioEventLoopGroup();
474         }
475         b.group(workerGroup);
476         b.option(ChannelOption.SO_KEEPALIVE, true);
477         b.handler(new ChannelInitializer<SocketChannel>() {
478             @Override
479             protected void initChannel(final SocketChannel ch) throws Exception {
480                 ch.pipeline().addLast(hf.getDecoders());
481                 ch.pipeline().addLast(hf.getEncoders());
482             }
483         });
484         b.localAddress(new InetSocketAddress(routerIp, 0));
485         b.option(ChannelOption.SO_REUSEADDR, true);
486         final ChannelFuture future = b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
487         waitFutureSuccess(future);
488         return future.channel();
489     }
490
491     private static RouterId getRouterId(final String routerIp) {
492         return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
493     }
494 }