BUG-197: improve session ID tracking
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / AbstractPCEPSessionNegotiatorFactory.java
index 24eccb3a908bd687d9ae1f32ce0ee264c2569a85..3b4933299bdbcb472f6727c90f7130503e93785f 100644 (file)
@@ -12,12 +12,11 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.util.concurrent.Promise;
 
-import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Comparator;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -30,6 +29,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.primitives.UnsignedBytes;
@@ -39,11 +40,38 @@ import com.google.common.primitives.UnsignedBytes;
  * further subclassed to provide either a client or server factory.
  */
 public abstract class AbstractPCEPSessionNegotiatorFactory implements
-               SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> {
+SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> {
        private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
        private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class);
-       private final BiMap<byte[], Closeable> sessions = HashBiMap.create();
-       private final Map<byte[], Short> sessionIds = new WeakHashMap<>();
+
+       /**
+        * The total amount of time we should remember a peer having been present, unless some other pressure
+        * forces us to forget about it due to {@link PEER_CACHE_SIZE}.
+        */
+       private static final long PEER_CACHE_SECONDS = 24 * 3600;
+
+       /**
+        * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines
+        * how many peers we can see turn around.
+        */
+       private static final long PEER_CACHE_SIZE = 1024;
+
+       /**
+        * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse
+        * it.
+        */
+       private static final long ID_CACHE_SECONDS = 3*3600;
+
+       @GuardedBy("this")
+       private final BiMap<byte[], SessionReference> sessions = HashBiMap.create();
+
+       @GuardedBy("this")
+       private final Cache<byte[], PeerRecord> formerClients = CacheBuilder.newBuilder().
+       expireAfterAccess(PEER_CACHE_SECONDS, TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
+
+       private interface SessionReference extends AutoCloseable {
+               Short getSessionId();
+       }
 
        /**
         * Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator.
@@ -65,7 +93,7 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                LOG.debug("Instantiating bootstrap negotiator for channel {}", channel);
                return new AbstractSessionNegotiator<Message, PCEPSessionImpl>(promise, channel) {
                        @Override
-                       protected void startNegotiation() throws Exception {
+                       protected void startNegotiation() throws ExecutionException {
                                LOG.debug("Bootstrap negotiation for channel {} started", this.channel);
 
                                /*
@@ -75,15 +103,14 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                                final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress();
 
                                synchronized (lock) {
-
                                        if (AbstractPCEPSessionNegotiatorFactory.this.sessions.containsKey(clientAddress)) {
                                                final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress();
                                                if (COMPARATOR.compare(serverAddress, clientAddress) > 0) {
-                                                       final Closeable n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress);
+                                                       final SessionReference n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress);
                                                        try {
                                                                n.close();
-                                                       } catch (final IOException e) {
-                                                               LOG.warn("Unexpected failure to close old session", e);
+                                                       } catch (final Exception e) {
+                                                               LOG.error("Unexpected failure to close old session", e);
                                                        }
                                                } else {
                                                        negotiationFailed(new RuntimeException("A conflicting session for address "
@@ -92,13 +119,27 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                                                }
                                        }
 
-                                       final short sessionId = nextSession(clientAddress);
+                                       final Short sessionId = nextSession(clientAddress);
                                        final AbstractPCEPSessionNegotiator n = createNegotiator(promise, factory.getSessionListener(), this.channel, sessionId);
 
-                                       AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new Closeable() {
+                                       AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new SessionReference() {
+                                               @Override
+                                               public void close() throws ExecutionException {
+                                                       try {
+                                                               formerClients.get(clientAddress, new Callable<PeerRecord>() {
+                                                                       @Override
+                                                                       public PeerRecord call() {
+                                                                               return new PeerRecord(ID_CACHE_SECONDS, getSessionId());
+                                                                       }
+                                                               });
+                                                       } finally {
+                                                               channel.close();
+                                                       }
+                                               }
+
                                                @Override
-                                               public void close() {
-                                                       channel.close();
+                                               public Short getSessionId() {
+                                                       return sessionId;
                                                }
                                        });
 
@@ -111,7 +152,7 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                                                }
                                        });
 
-                                       LOG.debug("Replacing bootstrap negotiator for channel {}", this.channel);
+                                       LOG.info("Replacing bootstrap negotiator for channel {}", this.channel);
                                        this.channel.pipeline().replace(this, "negotiator", n);
                                        n.startNegotiation();
                                }
@@ -125,18 +166,14 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
        }
 
        @GuardedBy("this")
-       private short nextSession(final byte[] clientAddress) {
-               /*
-                * FIXME: BUG-197: Improve the allocation algorithm to make sure:
-                * - no duplicate IDs are assigned
-                * - we retain former session IDs for a reasonable time
-                */
-               Short next = this.sessionIds.get(clientAddress);
-               if (next == null) {
-                       next = 0;
-               }
-
-               this.sessionIds.put(clientAddress, (short) ((next + 1) % 255));
-               return next;
+       private Short nextSession(final byte[] clientAddress) throws ExecutionException {
+               final PeerRecord peer = formerClients.get(clientAddress, new Callable<PeerRecord>() {
+                       @Override
+                       public PeerRecord call() throws Exception {
+                               return new PeerRecord(ID_CACHE_SECONDS, null);
+                       }
+               });
+
+               return peer.allocId();
        }
 }