Use reconnect strategy 80/2780/2
authorRobert Varga <rovarga@cisco.com>
Fri, 15 Nov 2013 14:48:15 +0000 (15:48 +0100)
committerRobert Varga <rovarga@cisco.com>
Fri, 15 Nov 2013 15:03:07 +0000 (16:03 +0100)
Change-Id: I74fae31dd14afd2d39889f56ed9787af12c6eef3
Signed-off-by: Robert Varga <rovarga@cisco.com>
bgp/rib-impl-config/src/main/java/org/opendaylight/controller/config/yang/bgp/rib/impl/RIBImplModule.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGP.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/spi/BGPDispatcher.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPImplTest.java
bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/BGPMock.java

index 32250a68159a30dd14c366a426810e4fb7ea683f..9097274c36416b928cd3ada5d5a2c62e27479d29 100644 (file)
@@ -11,8 +11,6 @@ package org.opendaylight.controller.config.yang.bgp.rib.impl;
 
 import io.netty.util.concurrent.GlobalEventExecutor;
 
-import java.io.IOException;
-
 import org.opendaylight.controller.config.api.JmxAttributeValidationException;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
@@ -23,6 +21,8 @@ import org.opendaylight.protocol.bgp.rib.impl.BGP;
 import org.opendaylight.protocol.bgp.rib.impl.BGPPeer;
 import org.opendaylight.protocol.bgp.rib.impl.RIBImpl;
 import org.opendaylight.protocol.concepts.ListenerRegistration;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
 import org.osgi.framework.BundleContext;
 
@@ -71,17 +71,22 @@ org.opendaylight.controller.config.yang.bgp.rib.impl.AbstractRIBImplModule {
                BGP bgp = getBgpDependency();
                final BGPPeer peer = new BGPPeer(rib, "peer-" + bgp.toString());
 
-               try {
-                       final long reconnects = getReconnectAttempts();
-                       ListenerRegistration<BGPSessionListener> reg = bgp
-                                       .registerUpdateListener(peer, new TimedReconnectStrategy(
-                                                       GlobalEventExecutor.INSTANCE,
-                                                       getConnectionTimeout(), 5000, 1.0, null,
-                                                       reconnects, null));
-                       return new RibImplCloseable(reg, rib);
-               } catch (IOException e) {
-                       throw new RuntimeException("Failed to register with BGP", e);
-               }
+               final long reconnects = getReconnectAttempts();
+               ListenerRegistration<BGPSessionListener> reg = bgp
+                               .registerUpdateListener(peer,
+                                               new ReconnectStrategyFactory() {
+                                       @Override
+                                       public ReconnectStrategy createReconnectStrategy() {
+                                               return new TimedReconnectStrategy(
+                                                               GlobalEventExecutor.INSTANCE,
+                                                               getConnectionTimeout(), 5000, 1.0, null,
+                                                               reconnects, null);
+                                       }
+                               }, new TimedReconnectStrategy(
+                                               GlobalEventExecutor.INSTANCE,
+                                               getConnectionTimeout(), 5000, 1.0, null,
+                                               reconnects, null));
+               return new RibImplCloseable(reg, rib);
        }
 
        private static final class RibImplCloseable implements AutoCloseable {
index 6e7d494a2b7506de3f85d00aa054f8388d981d94..4665bff08015e37cfe7a0361ed75f8acf7300828 100644 (file)
@@ -12,6 +12,7 @@ import java.io.IOException;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.concepts.ListenerRegistration;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 
 /**
  * BGP interface. At this time it only supports listening to changes seen by the backing device, typically a network
@@ -24,10 +25,12 @@ public interface BGP {
         * needs to be explicitly closed in order to stop receiving the updates.
         * 
         * @param listener {@link BGPSessionListener}
-        * @param strategy {@link ReconnectStrategy} to use for TCP-level retries
+        * @param tcpStrategyFactory {@link ReconnectStrategyFactory} to use for creating TCP-level retry strategies
+        * @param sessionStrategy {@link ReconnectStrategy} to use for session-level retries
         * @throws IllegalStateException if there is already a listener registered
         * @throws IOException if some IO error occurred
         * @return ListenerRegistration
         */
-       public ListenerRegistration<BGPSessionListener> registerUpdateListener(BGPSessionListener listener, ReconnectStrategy strategy) throws IOException;
+       public ListenerRegistration<BGPSessionListener> registerUpdateListener(
+                       BGPSessionListener listener, ReconnectStrategyFactory tcpStrategyFactory, ReconnectStrategy sessionStrategy);
 }
index 2e3f819e70e577150e5779d9d50199d1e9de573a..13b17b090b157a843f08e9fdd49391301b6a000f 100644 (file)
@@ -13,16 +13,18 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
+
+import java.net.InetSocketAddress;
+
 import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
 import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 
-import java.net.InetSocketAddress;
-
 /**
  * Implementation of BGPDispatcher.
  */
@@ -31,7 +33,7 @@ public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl,
 
        private final BGPHandlerFactory hf;
 
-       public BGPDispatcherImpl(final BGPMessageFactory parser, EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+       public BGPDispatcherImpl(final BGPMessageFactory parser, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
                super(bossGroup, workerGroup);
                this.hf = new BGPHandlerFactory(parser);
        }
@@ -56,6 +58,29 @@ public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl,
                });
        }
 
+       @Override
+       public Future<Void> createReconnectingClient(final InetSocketAddress address,
+                       final BGPSessionPreferences preferences, final BGPSessionListener listener,
+                       final ReconnectStrategyFactory connectStrategyFactory,
+                       final ReconnectStrategy reestablishStrategy) {
+               final BGPSessionNegotiatorFactory snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
+               final SessionListenerFactory<BGPSessionListener> slf = new SessionListenerFactory<BGPSessionListener>() {
+                       @Override
+                       public BGPSessionListener getSessionListener() {
+                               return listener;
+                       }
+               };
+
+               return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy, new PipelineInitializer<BGPSessionImpl>() {
+                       @Override
+                       public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
+                               ch.pipeline().addLast(hf.getDecoders());
+                               ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(slf, ch, promise));
+                               ch.pipeline().addLast(hf.getEncoders());
+                       }
+               });
+       }
+
        @Override
        public void close() {
        }
index a963fc5d956596df6b199f91963e39e996629702..23db57cfe66c1bbe6565459399877021255b6780 100644 (file)
@@ -7,17 +7,17 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
+import io.netty.util.concurrent.Future;
+
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutionException;
 
-import org.opendaylight.protocol.bgp.parser.BGPSession;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionProposal;
 import org.opendaylight.protocol.concepts.ListenerRegistration;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -25,23 +25,6 @@ import com.google.common.base.Preconditions;
  * Implementation of {@link BGP}.
  */
 public class BGPImpl implements BGP, Closeable {
-       /**
-        * Wrapper class to give listener a close method.
-        */
-       public class BGPListenerRegistration extends ListenerRegistration<BGPSessionListener> {
-               private final BGPSession session;
-
-               public BGPListenerRegistration(final BGPSessionListener l, final BGPSession session) {
-                       super(l);
-                       this.session = session;
-               }
-
-               @Override
-               public void removeRegistration() {
-                       this.session.close();
-               }
-       }
-
        private final BGPDispatcher dispatcher;
 
        private final InetSocketAddress address;
@@ -54,18 +37,15 @@ public class BGPImpl implements BGP, Closeable {
                this.proposal = Preconditions.checkNotNull(proposal);
        }
 
-       /**
-        * {@inheritDoc}
-        */
        @Override
-       public BGPListenerRegistration registerUpdateListener(final BGPSessionListener listener, final ReconnectStrategy strategy) throws IOException {
-               final BGPSession session;
-               try {
-                       session = this.dispatcher.createClient(this.address, this.proposal.getProposal(), listener, strategy).get();
-               } catch (InterruptedException | ExecutionException e) {
-                       throw new IOException("Failed to connect to peer", e);
-               }
-               return new BGPListenerRegistration(listener, session);
+       public ListenerRegistration<BGPSessionListener> registerUpdateListener(final BGPSessionListener listener, final ReconnectStrategyFactory tcpStrategyFactory, final ReconnectStrategy sessionStrategy) {
+               final Future<Void> s = this.dispatcher.createReconnectingClient(address, this.proposal.getProposal(), listener, tcpStrategyFactory, sessionStrategy);
+               return new ListenerRegistration<BGPSessionListener>(listener) {
+                       @Override
+                       protected void removeRegistration() {
+                               s.cancel(true);
+                       }
+               };
        }
 
        @Override
index 10d3ab5965f94a012f361e6a2b29999d7ffea072..e5b9faeef64beb11e3e28ee3f5566600814788b4 100644 (file)
@@ -57,8 +57,7 @@ public final class BGPPeer implements BGPSessionListener, Peer {
                }
        }
 
-       @Override
-       public void onSessionDown(final BGPSession session, final Exception e) {
+       private void cleanup() {
                // FIXME: support graceful restart
                for (final TablesKey key : this.tables) {
                        this.rib.clearTable(this, key);
@@ -67,13 +66,20 @@ public final class BGPPeer implements BGPSessionListener, Peer {
                this.tables.clear();
        }
 
+       @Override
+       public void onSessionDown(final BGPSession session, final Exception e) {
+               logger.info("Session with peer {} went down", this.name, e);
+               cleanup();
+       }
+
        @Override
        public void onSessionTerminated(final BGPSession session, final BGPTerminationReason cause) {
                logger.info("Session with peer {} terminated: {}", this.name, cause);
+               cleanup();
        }
 
        @Override
-       public String toString() {
+       public final String toString() {
                return addToStringAttributes(Objects.toStringHelper(this)).toString();
        }
 
index 433bfbafd046c9b76d516ad8811a10a2a8eee38b..9446f4af92b63cb6e69fc97fbff6a926f7cde001 100644 (file)
@@ -14,6 +14,7 @@ import java.net.InetSocketAddress;
 import org.opendaylight.protocol.bgp.parser.BGPSession;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 
 /**
  * Dispatcher class for creating BGP clients.
@@ -29,4 +30,7 @@ public interface BGPDispatcher {
         */
        Future<? extends BGPSession> createClient(InetSocketAddress address, BGPSessionPreferences preferences, BGPSessionListener listener,
                        final ReconnectStrategy strategy);
+
+       Future<Void> createReconnectingClient(InetSocketAddress address, BGPSessionPreferences preferences, BGPSessionListener listener,
+                       ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy);
 }
index 8f2a806a6411d933ed3ce3ce04a8c1b9b1d01062..086b1f6e9e619c8ece67f06f7c9890944ca0d4dd 100644 (file)
@@ -25,12 +25,13 @@ import org.mockito.MockitoAnnotations;
 import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
 import org.opendaylight.protocol.bgp.parser.BGPSession;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.rib.impl.BGPImpl.BGPListenerRegistration;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionProposal;
+import org.opendaylight.protocol.concepts.ListenerRegistration;
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130918.open.BgpParameters;
 
 public class BGPImplTest {
@@ -45,7 +46,7 @@ public class BGPImplTest {
        private BGPMessageFactory parser;
 
        @Mock
-       private Future<BGPSession> future;
+       private Future<Void> future;
 
        private BGPImpl bgp;
 
@@ -54,17 +55,21 @@ public class BGPImplTest {
                MockitoAnnotations.initMocks(this);
                doReturn("").when(this.parser).toString();
 
-               doReturn(null).when(this.future).get();
-               doReturn(this.future).when(this.disp).createClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
-                               any(BGPSessionListener.class), any(ReconnectStrategy.class));
+               doReturn(this.future).when(this.disp).createReconnectingClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
+                               any(BGPSessionListener.class), any(ReconnectStrategyFactory.class), any(ReconnectStrategy.class));
        }
 
        @Test
        public void testBgpImpl() throws Exception {
                doReturn(new BGPSessionPreferences(0, 0, null, Collections.<BgpParameters> emptyList())).when(this.prop).getProposal();
                this.bgp = new BGPImpl(this.disp, new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), this.prop);
-               final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(),
-                               new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+               final ListenerRegistration<?> reg = this.bgp.registerUpdateListener(new SimpleSessionListener(),
+                               new ReconnectStrategyFactory() {
+                       @Override
+                       public ReconnectStrategy createReconnectStrategy() {
+                               return new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000);
+                       }
+               }, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
                assertEquals(SimpleSessionListener.class, reg.getListener().getClass());
        }
 
index 1b77f841d11c733444d83b3c6547792866c7134c..22756166b6de98fcc8c78c8eb6949df71929f784 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.protocol.framework.DeserializerException;
 import org.opendaylight.protocol.framework.DocumentedException;
 import org.opendaylight.protocol.framework.ProtocolMessageFactory;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.util.ByteArray;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130918.NotifyBuilder;
 import org.opendaylight.yangtools.yang.binding.Notification;
@@ -78,16 +79,6 @@ public final class BGPMock implements BGP, Closeable {
                return messages;
        }
 
-       /**
-        * @param listener BGPListener
-        * @return ListenerRegistration
-        */
-       @Override
-       public synchronized ListenerRegistration<BGPSessionListener> registerUpdateListener(final BGPSessionListener listener,
-                       final ReconnectStrategy strategy) {
-               return EventBusRegistration.createAndRegister(this.eventBus, listener, this.allPreviousBGPMessages);
-       }
-
        public synchronized void insertConnectionLostEvent() {
                this.insertMessage(CONNECTION_LOST_MAGIC_MSG);
        }
@@ -129,4 +120,12 @@ public final class BGPMock implements BGP, Closeable {
        public EventBus getEventBus() {
                return this.eventBus;
        }
+
+       @Override
+       public ListenerRegistration<BGPSessionListener> registerUpdateListener(
+                       final BGPSessionListener listener,
+                       final ReconnectStrategyFactory tcpStrategyFactory,
+                       final ReconnectStrategy sessionStrategy) {
+               return EventBusRegistration.createAndRegister(this.eventBus, listener, this.allPreviousBGPMessages);
+       }
 }