Do not instantiate executor service for tasks 23/78823/2
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 15 Dec 2018 19:01:09 +0000 (20:01 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 15 Dec 2018 19:35:23 +0000 (20:35 +0100)
Netty channel (which backs each BGPSession) is itself backed by
an EventLoop, which is an implementation of ScheduledExecutorService.

Take advantage of this to schedule the timers we need, instead
of leaking threads by allocating executor services again and
again.

Change-Id: I6bb4bf8d63abd0af0c8cf10012a905db8fa0975d
JIRA: BGPCEP-495
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java
bgp/rib-mock/pom.xml
bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/EventBusRegistration.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPSession.java

index 1a7eb3c500ce15ccbb233f0788626bcfac2c63aa..32ef577636ca21de24db0a4e3f57436414b1da2e 100644 (file)
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.concurrent.GuardedBy;
@@ -599,8 +598,8 @@ public class BGPPeer extends AbstractPeer implements BGPSessionListener {
             setAfiSafiGracefulRestartState(0, false, false);
             onSessionTerminated(this.session, new BGPTerminationReason(BGPError.HOLD_TIMER_EXPIRED));
         }
-        new ScheduledThreadPoolExecutor(1)
-                .schedule(this::handleRestartTimer, peerRestartTimeNanos - elapsedNanos, TimeUnit.NANOSECONDS);
+
+        this.session.schedule(this::handleRestartTimer, peerRestartTimeNanos - elapsedNanos, TimeUnit.NANOSECONDS);
     }
 
     private synchronized void handleSelectionReferralTimer() {
@@ -614,8 +613,8 @@ public class BGPPeer extends AbstractPeer implements BGPSessionListener {
             this.missingEOT.clear();
             handleGracefulEndOfRib();
         }
-        new ScheduledThreadPoolExecutor(1)
-                .schedule(this::handleSelectionReferralTimer, referalTimerNanos - elapsedNanos, TimeUnit.NANOSECONDS);
+        this.session.schedule(this::handleSelectionReferralTimer, referalTimerNanos - elapsedNanos,
+            TimeUnit.NANOSECONDS);
     }
 
     private void releaseConnectionGracefully() {
index abf3b7f09a7146c67c2cd8754b0cc4e0967683d1..68ab5da2aed60ce48eebf88b25a887776fb119bf 100644 (file)
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.ScheduledFuture;
 import java.io.IOException;
 import java.nio.channels.NonWritableChannelException;
 import java.util.ArrayList;
@@ -564,4 +565,9 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification> im
     public <T extends PeerConstraint> void addDecoderConstraint(final Class<T> constraintClass, final T constraint) {
         this.channel.pipeline().get(BGPByteToMessageDecoder.class).addDecoderConstraint(constraintClass, constraint);
     }
+
+    @Override
+    public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+        return this.channel.eventLoop().schedule(command, delay, unit);
+    }
 }
index cb4596f13bfff5e1dc2a1503f3a89e003d5521be..cde2868e2b816f5a31f0e6117d0fd5b9c9a2bec2 100644 (file)
             <groupId>io.netty</groupId>
             <artifactId>netty-buffer</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+        </dependency>
 
         <!-- Testing dependencies -->
         <dependency>
index 6450118daa68d7a28f7bae2aa9abe0703ed549fa..604e0286fffc84673598ba18f9c2bb5e6c0402f5 100644 (file)
@@ -10,11 +10,13 @@ package org.opendaylight.protocol.bgp.rib.mock;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.ScheduledFuture;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
 import org.opendaylight.protocol.bgp.parser.BGPError;
 import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
@@ -206,5 +208,10 @@ final class EventBusRegistration extends AbstractListenerRegistration<BGPSession
                 final T constraint) {
             // No-op
         }
+
+        @Override
+        public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+            return null;
+        }
     }
 }
index 1d4861e2073f132fc423e52935dcd40c20711699..6540d5e5cace24aab2da1279687596231bb3cefb 100644 (file)
@@ -8,8 +8,10 @@
 package org.opendaylight.protocol.bgp.rib.spi;
 
 import io.netty.channel.ChannelInboundHandler;
+import io.netty.util.concurrent.ScheduledFuture;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.protocol.bgp.parser.GracefulRestartUtil;
 import org.opendaylight.protocol.bgp.parser.spi.PeerConstraint;
@@ -90,4 +92,14 @@ public interface BGPSession extends AutoCloseable, ChannelInboundHandler {
      * Add peer constraint to session pipeline decoder.
      */
     <T extends PeerConstraint> void addDecoderConstraint(Class<T> constraintClass, T constraint);
+
+    /**
+     * Schedule a task to be executed in the context of the session handling thread.
+     *
+     * @param command the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return Future representing the scheduled task.
+     */
+    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
 }