BUG-6810: Fix intermintent Be BmpMonitorImplTest failure 85/46185/1
authorClaudio D. Gasparini <cgaspari@cisco.com>
Mon, 26 Sep 2016 11:34:32 +0000 (13:34 +0200)
committerClaudio D. Gasparini <cgaspari@cisco.com>
Mon, 26 Sep 2016 13:39:23 +0000 (15:39 +0200)
Fix intermintent Be BmpMonitorImplTest failure

Change-Id: I649b941276778d005c2de0c5aa016e087ba58f0b
Signed-off-by: Claudio D. Gasparini <cgaspari@cisco.com>
bgp/bmp-impl/src/test/java/org/opendaylight/protocol/bmp/impl/app/BmpMonitorImplTest.java

index 06216da7a3c157fa8c615f6a277da79580ca6945..b13fa6e49960a4d8a7600ceaa59c77960e55f244 100644 (file)
@@ -16,8 +16,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -25,9 +28,14 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import javassist.ClassPool;
+import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -112,7 +120,8 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
 
     private static final int PORT = 12345;
     private static final String LOCAL_ADDRESS = "127.0.0.10";
-    private static final InetSocketAddress CLIENT_REMOTE = new InetSocketAddress("127.0.0.10", PORT);
+    private static final String LOCAL_ADDRESS_2 = "127.0.0.11";
+    private static final InetSocketAddress CLIENT_REMOTE = new InetSocketAddress(LOCAL_ADDRESS, PORT);
     private static final InetSocketAddress CLIENT_LOCAL = new InetSocketAddress(LOCAL_ADDRESS, 0);
     private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
@@ -181,11 +190,17 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
                 MONITOR_ID, new InetSocketAddress(InetAddresses.forString("127.0.0.10"), PORT), Optional.of(keys),
                 this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
 
-        final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
-        Assert.assertEquals(1, monitor.getMonitor().size());
-        final Monitor bmpMonitor = monitor.getMonitor().get(0);
-        Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
-        Assert.assertEquals(0, bmpMonitor.getRouter().size());
+        readData(InstanceIdentifier.create(BmpMonitor.class), new Function<BmpMonitor, Object>() {
+            @Nullable
+            @Override
+            public Object apply(@Nullable final BmpMonitor monitor) {
+                Assert.assertEquals(1, monitor.getMonitor().size());
+                final Monitor bmpMonitor = monitor.getMonitor().get(0);
+                Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
+                Assert.assertEquals(0, bmpMonitor.getRouter().size());
+                return monitor;
+            }
+        });
     }
 
     @After
@@ -196,8 +211,15 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         this.dispatcher.close();
         this.bmpApp.close();
         this.mappingService.close();
-        final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
-        assertTrue(monitor.getMonitor().isEmpty());
+
+        readData(InstanceIdentifier.create(BmpMonitor.class), new Function<BmpMonitor, Object>() {
+            @Nullable
+            @Override
+            public Object apply(@Nullable final BmpMonitor monitor) {
+                assertTrue(monitor.getMonitor().isEmpty());
+                return monitor;
+            }
+        });
     }
 
     @Test
@@ -207,117 +229,167 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             Thread.sleep(500);
             final KeyedInstanceIdentifier<Monitor, MonitorKey> monitorIId = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
 
-            final Monitor monitor = getBmpData(monitorIId).get();
-            assertEquals(1, monitor.getRouter().size());
-            final Router router = monitor.getRouter().get(0);
-            assertEquals(ROUTER_ID, router.getRouterId());
-            assertEquals(Status.Down, router.getStatus());
-            assertTrue(router.getPeer().isEmpty());
+            readData(monitorIId, new Function<Monitor, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final Monitor monitor) {
+                    assertEquals(1, monitor.getRouter().size());
+                    final Router router = monitor.getRouter().get(0);
+                    assertEquals(ROUTER_ID, router.getRouterId());
+                    assertEquals(Status.Down, router.getStatus());
+                    assertTrue(router.getPeer().isEmpty());
+                    return monitor;
+                }
+            });
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
+            readData(monitorIId, new Function<Monitor, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final Monitor monitor) {
+                    assertEquals(1, monitor.getRouter().size());
+                    final Router routerInit = monitor.getRouter().get(0);
+                    assertEquals("some info;", routerInit.getInfo());
+                    assertEquals("name", routerInit.getName());
+                    assertEquals("description", routerInit.getDescription());
+                    assertEquals(ROUTER_ID, routerInit.getRouterId());
+                    assertTrue(routerInit.getPeer().isEmpty());
+                    assertEquals(Status.Up, routerInit.getStatus());
+                    return monitor;
+                }
+            });
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
 
-            channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info"));
-            Thread.sleep(500);
-            final Monitor monitorInit = getBmpData(monitorIId).get();
-            assertEquals(1, monitorInit.getRouter().size());
-            final Router routerInit = monitorInit.getRouter().get(0);
-            assertEquals("some info;", routerInit.getInfo());
-            assertEquals("name", routerInit.getName());
-            assertEquals("description", routerInit.getDescription());
-            assertEquals(ROUTER_ID, routerInit.getRouterId());
-            assertTrue(routerInit.getPeer().isEmpty());
-            assertEquals(Status.Up, routerInit.getStatus());
-
-            channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true));
-            Thread.sleep(500);
             final KeyedInstanceIdentifier<Router, RouterKey> routerIId = monitorIId.child(Router.class, new RouterKey(ROUTER_ID));
-            final List<Peer> peers = getBmpData(routerIId).get().getPeer();
-            assertEquals(1, peers.size());
-            final Peer peer = peers.get(0);
-            assertEquals(PeerType.Global, peer.getType());
-            assertEquals(PEER_ID, peer.getPeerId());
-            assertEquals(PEER1, peer.getBgpId());
-            assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
-            assertEquals(TestUtil.PEER_AS, peer.getAs());
-            assertNull(peer.getDistinguisher());
-            assertNull(peer.getStats());
-
-            assertNotNull(peer.getPrePolicyRib());
-            assertEquals(1, peer.getPrePolicyRib().getTables().size());
-            final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
-            assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
-            assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
-            assertFalse(prePolicyTable.getAttributes().isUptodate());
-            assertNotNull(prePolicyTable.getRoutes());
-
-            assertNotNull(peer.getPostPolicyRib());
-            assertEquals(1, peer.getPostPolicyRib().getTables().size());
-            final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
-            assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
-            assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
-            assertFalse(postPolicyTable.getAttributes().isUptodate());
-            assertNotNull(postPolicyTable.getRoutes());
-
-            assertNotNull(peer.getPeerSession());
-            final PeerSession peerSession = peer.getPeerSession();
-            assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
-            assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
-            assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
-            assertEquals(Status.Up, peerSession.getStatus());
+            readData(routerIId, new Function<Router, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final Router router) {
+                    final List<Peer> peers = router.getPeer();
+                    assertEquals(1, peers.size());
+                    final Peer peer = peers.get(0);
+                    assertEquals(PeerType.Global, peer.getType());
+                    assertEquals(PEER_ID, peer.getPeerId());
+                    assertEquals(PEER1, peer.getBgpId());
+                    assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
+                    assertEquals(TestUtil.PEER_AS, peer.getAs());
+                    assertNull(peer.getDistinguisher());
+                    assertNull(peer.getStats());
+
+                    assertNotNull(peer.getPrePolicyRib());
+                    assertEquals(1, peer.getPrePolicyRib().getTables().size());
+                    final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
+                    assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
+                    assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
+                    assertFalse(prePolicyTable.getAttributes().isUptodate());
+                    assertNotNull(prePolicyTable.getRoutes());
+
+                    assertNotNull(peer.getPostPolicyRib());
+                    assertEquals(1, peer.getPostPolicyRib().getTables().size());
+                    final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
+                    assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
+                    assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
+                    assertFalse(postPolicyTable.getAttributes().isUptodate());
+                    assertNotNull(postPolicyTable.getRoutes());
+
+                    assertNotNull(peer.getPeerSession());
+                    final PeerSession peerSession = peer.getPeerSession();
+                    assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
+                    assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
+                    assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
+                    assertEquals(Status.Up, peerSession.getStatus());
+                    return router;
+                }
+            });
+
 
             final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
-            channel.writeAndFlush(statsMsg);
-            Thread.sleep(500);
+            waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
             final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
-            final Stats peerStats = getBmpData(peerIId.child(Stats.class)).get();
-            assertNotNull(peerStats.getTimestampSec());
-            final Tlvs tlvs = statsMsg.getTlvs();
-            assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
-            assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
-            assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
-            assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
-            assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
-            assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
-            assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
-            assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
-            assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
-            assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
-            assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
-
-            // route mirror message test
+            readData(peerIId.child(Stats.class), new Function<Stats, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final Stats peerStats) {
+                    assertNotNull(peerStats.getTimestampSec());
+                    final Tlvs tlvs = statsMsg.getTlvs();
+                    assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
+                    assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
+                    assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
+                    assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
+                    assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
+                    assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
+                    assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
+                    assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
+                    assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
+                    assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
+                    assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
+                    return peerStats;
+                }
+            });
+         // route mirror message test
             final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
-            channel.writeAndFlush(routeMirrorMsg);
-            Thread.sleep(500);
-            final Mirrors routeMirrors = getBmpData(peerIId.child(Mirrors.class)).get();
-            assertNotNull(routeMirrors.getTimestampSec());
-
-            channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy));
-            channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy));
-            Thread.sleep(500);
-            final Tables prePolicyRib = getBmpData(peerIId.child(PrePolicyRib.class)).get().getTables().get(0);
-            assertTrue(prePolicyRib.getAttributes().isUptodate());
-            assertEquals(3, ((Ipv4RoutesCase) prePolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
-
-            channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy));
-            channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy));
-            Thread.sleep(500);
-            final Tables postPolicyRib = getBmpData(peerIId.child(PostPolicyRib.class)).get().getTables().get(0);
-            assertTrue(postPolicyRib.getAttributes().isUptodate());
-            assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase) postPolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
-
-            channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1));
-            Thread.sleep(500);
-            final List<Peer> peersAfterDown = getBmpData(routerIId).get().getPeer();
-            assertTrue(peersAfterDown.isEmpty());
+            waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
+            readData(peerIId.child(Mirrors.class), new Function<Mirrors, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final Mirrors routeMirrors) {
+                    assertNotNull(routeMirrors.getTimestampSec());
+                    return routeMirrors;
+                }
+            });
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
+            readData(peerIId.child(PrePolicyRib.class), new Function<PrePolicyRib, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final PrePolicyRib prePolicyRib) {
+                    final Tables table = prePolicyRib.getTables().get(0);
+                    assertTrue(table.getAttributes().isUptodate());
+                    assertEquals(3, ((Ipv4RoutesCase) table.getRoutes()).getIpv4Routes().getIpv4Route().size());
+                    return prePolicyRib;
+                }
+            });
+
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
+            readData(peerIId.child(PostPolicyRib.class), new Function<PostPolicyRib, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final PostPolicyRib postPolicyRib) {
+                    final Tables tables = postPolicyRib.getTables().get(0);
+                    assertTrue(tables.getAttributes().isUptodate());
+                    assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
+                    return postPolicyRib;
+                }
+            });
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
+            readData(routerIId, new Function<Router, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final Router peersAfterDown) {
+                    assertTrue(peersAfterDown.getPeer().isEmpty());
+                    return peersAfterDown;
+                }
+            });
 
             channel.close().await();
-            Thread.sleep(500);
-            final Monitor monitorAfterClose = getBmpData(monitorIId).get();
-            assertTrue(monitorAfterClose.getRouter().isEmpty());
+            readData(monitorIId, new Function<Monitor, Object>() {
+                @Nullable
+                @Override
+                public Object apply(@Nullable final Monitor monitorAfterClose) {
+                    assertTrue(monitorAfterClose.getRouter().isEmpty());
+                    return monitorAfterClose;
+                }
+            });
+
         } catch (final Exception e) {
             final StringBuffer ex = new StringBuffer();
             ex.append(e.getMessage() + "\n");
             for (final StackTraceElement element: e.getStackTrace()) {
                 ex.append(element.toString() + "\n");
-            };
+            }
             fail(ex.toString());
         }
     }
@@ -325,10 +397,18 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     @Test
     public void deploySecondInstance() throws Exception {
         final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
-                new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString("127.0.0.11"), PORT), Optional.of(new KeyMapping()),
-                this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
-        final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
-        Assert.assertEquals(2, monitor.getMonitor().size());
+                new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(LOCAL_ADDRESS_2), PORT), Optional.of(new KeyMapping()),
+                this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
+
+        readData(InstanceIdentifier.create(BmpMonitor.class), new Function<BmpMonitor, Object>() {
+            @Nullable
+            @Override
+            public Object apply(@Nullable final BmpMonitor monitor) {
+                Assert.assertEquals(2, monitor.getMonitor().size());
+                return monitor;
+            }
+        });
+
         monitoringStation2.close();
     }
 
@@ -355,4 +435,40 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             return tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
         }
     }
+
+    private <R, T extends DataObject> R readData(final InstanceIdentifier<T> iid, final Function<T, R> function)
+        throws ReadFailedException {
+        AssertionError lastError = null;
+        final Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+            try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
+                final Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
+                if(data.isPresent()) {
+                    try {
+                        return function.apply(data.get());
+                    } catch (final AssertionError e) {
+                        lastError = e;
+                        Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+                    }
+                }
+            }
+        }
+
+        throw lastError;
+    }
+
+    private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        future.addListener(new FutureListener<T>() {
+            @Override
+            public void operationComplete(final Future future) throws Exception {
+                latch.countDown();
+            }
+        });
+        Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
+    }
+
+    private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
+        waitFutureSuccess(channelFuture);
+    }
 }