import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.opendaylight.protocol.bgp.rib.impl.CheckUtil.readData;
+import static org.opendaylight.protocol.bgp.rib.impl.CheckUtil.waitFutureSuccess;
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import javassist.ClassPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.protocol.bgp.inet.RIBActivator;
import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
-import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
private static final String MD5_PASSWORD = "abcdef";
-
+ private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
private BindingToNormalizedNodeCodec mappingService;
private RIBActivator ribActivator;
private BGPActivator bgpActivator;
MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
- readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ readData(getDataBroker(), BMP_II, monitor -> {
Assert.assertEquals(1, monitor.getMonitor().size());
final Monitor bmpMonitor = monitor.getMonitor().get(0);
Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
this.bmpApp.close();
this.mappingService.close();
- readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ readData(getDataBroker(), BMP_II, monitor -> {
assertTrue(monitor.getMonitor().isEmpty());
return monitor;
});
public void testRouterMonitoring() throws Exception {
// first test if a single router monitoring is working
final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(1, monitor.getRouter().size());
return monitor;
});
final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(2, monitor.getRouter().size());
return monitor;
});
waitFutureSuccess(channel1.close());
// channel 2 is still open
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(1, monitor.getRouter().size());
return monitor;
});
final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(2, monitor.getRouter().size());
return monitor;
});
// sleep for a while to avoid intermittent InMemoryDataTree modification conflict
waitFutureSuccess(channel4.close());
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(0, monitor.getRouter().size());
return monitor;
});
}
- private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- future.addListener(future1 -> latch.countDown());
- Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
- }
-
private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
waitFutureSuccess(channelFuture);
}
final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
final RouterId routerId = getRouterId(remoteRouterIpAddr);
try {
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertFalse(monitor.getRouter().isEmpty());
// now find the current router instance
Router router = null;
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertFalse(monitor.getRouter().isEmpty());
Router retRouter = null;
for (final Router r : monitor.getRouter()) {
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
- readData(routerIId, router -> {
+ readData(getDataBroker(), routerIId, router -> {
final List<Peer> peers = router.getPeer();
assertEquals(1, peers.size());
final Peer peer = peers.get(0);
waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
- readData(peerIId.child(Stats.class), peerStats -> {
+ readData(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
assertNotNull(peerStats.getTimestampSec());
final Tlvs tlvs = statsMsg.getTlvs();
assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
- readData(peerIId.child(Mirrors.class), routeMirrors -> {
+ readData(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
assertNotNull(routeMirrors.getTimestampSec());
return routeMirrors;
});
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
- readData(peerIId.child(PrePolicyRib.class), prePolicyRib -> {
+ readData(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
assertTrue(!prePolicyRib.getTables().isEmpty());
final Tables tables = prePolicyRib.getTables().get(0);
assertTrue(tables.getAttributes().isUptodate());
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
- readData(peerIId.child(PostPolicyRib.class), postPolicyRib -> {
+ readData(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
assertTrue(!postPolicyRib.getTables().isEmpty());
final Tables tables = postPolicyRib.getTables().get(0);
assertTrue(tables.getAttributes().isUptodate());
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
- readData(routerIId, router -> {
+ readData(getDataBroker(), routerIId, router -> {
final List<Peer> peersAfterDown = router.getPeer();
assertTrue(peersAfterDown.isEmpty());
return router;
new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(KeyMapping.getKeyMapping()),
this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
- readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ readData(getDataBroker(), BMP_II, monitor -> {
Assert.assertEquals(2, monitor.getMonitor().size());
return monitor;
});
return future.channel();
}
- private <R, T extends DataObject> R readData(final InstanceIdentifier<T> iid, final Function<T, R> function)
- throws ReadFailedException {
- AssertionError lastError = null;
- final Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
- try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
- final Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
- if(data.isPresent()) {
- try {
- return function.apply(data.get());
- } catch (final AssertionError e) {
- lastError = e;
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- throw lastError;
- }
-
private RouterId getRouterId(final String routerIp) {
return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.netty.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.junit.Assert;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public final class CheckUtil {
+ private static final int TIMEOUT = 40;
+ private static final int LATCH_TIMEOUT = 10;
+ private static final int SLEEP_FOR = 20;
+ private static final int SLEEP_UNINTERRUPTIBLY = 50;
+ public static void checkReceivedMessages(final SimpleSessionListener listener, final int numberOfMessages)
+ throws ReadFailedException {
+ Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.SECONDS) <= TIMEOUT) {
+ if (listener.getListMsg().size() != numberOfMessages) {
+ Uninterruptibles.sleepUninterruptibly(SLEEP_UNINTERRUPTIBLY, TimeUnit.MILLISECONDS);
+ } else {
+ return;
+ }
+ }
+ Assert.fail();
+ }
+
+ public static <R, T extends DataObject> R readData(final DataBroker dataBroker, final InstanceIdentifier<T> iid, final Function<T, R> function)
+ throws ReadFailedException {
+ AssertionError lastError = null;
+ final Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.SECONDS) <= TIMEOUT) {
+ try (final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
+ final Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
+ if (data.isPresent()) {
+ try {
+ return function.apply(data.get());
+ } catch (final AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(SLEEP_FOR, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+ Assert.fail(lastError.getMessage());
+ throw lastError;
+ }
+
+
+ public static <T extends Future> void waitFutureSuccess(final T future) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ future.addListener(future1 -> latch.countDown());
+ Uninterruptibles.awaitUninterruptibly(latch, LATCH_TIMEOUT, TimeUnit.SECONDS);
+ }
+}
\ No newline at end of file