BUG-197: improve session ID tracking 24/3524/2
authorRobert Varga <rovarga@cisco.com>
Thu, 5 Dec 2013 21:22:57 +0000 (22:22 +0100)
committerRobert Varga <rovarga@cisco.com>
Fri, 6 Dec 2013 14:01:22 +0000 (15:01 +0100)
Change-Id: I45c4af7ec581b5d17dffea27f4e51219a26105f7
Signed-off-by: Robert Varga <rovarga@cisco.com>
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PeerRecord.java [new file with mode: 0644]
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/ParserToSalTest.java

index 50e1112d0d13ca9b4d0686fdea762d9d763199ce..06ffa43dd0590785d4ae5bd04764ad5137b0af16 100644 (file)
@@ -210,6 +210,7 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot
                        if (msg instanceof Keepalive) {
                                this.localOK = true;
                                if (this.remoteOK) {
+                                       LOG.info("Channel {} completed negotiation", this.channel);
                                        negotiationSuccessful(createSession(this.timer, this.channel, this.localPrefs, this.remotePrefs));
                                        this.state = State.Finished;
                                } else {
index 24eccb3a908bd687d9ae1f32ce0ee264c2569a85..3b4933299bdbcb472f6727c90f7130503e93785f 100644 (file)
@@ -12,12 +12,11 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.util.concurrent.Promise;
 
-import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Comparator;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -30,6 +29,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.primitives.UnsignedBytes;
@@ -39,11 +40,38 @@ import com.google.common.primitives.UnsignedBytes;
  * further subclassed to provide either a client or server factory.
  */
 public abstract class AbstractPCEPSessionNegotiatorFactory implements
-               SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> {
+SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> {
        private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
        private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class);
-       private final BiMap<byte[], Closeable> sessions = HashBiMap.create();
-       private final Map<byte[], Short> sessionIds = new WeakHashMap<>();
+
+       /**
+        * The total amount of time we should remember a peer having been present, unless some other pressure
+        * forces us to forget about it due to {@link PEER_CACHE_SIZE}.
+        */
+       private static final long PEER_CACHE_SECONDS = 24 * 3600;
+
+       /**
+        * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines
+        * how many peers we can see turn around.
+        */
+       private static final long PEER_CACHE_SIZE = 1024;
+
+       /**
+        * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse
+        * it.
+        */
+       private static final long ID_CACHE_SECONDS = 3*3600;
+
+       @GuardedBy("this")
+       private final BiMap<byte[], SessionReference> sessions = HashBiMap.create();
+
+       @GuardedBy("this")
+       private final Cache<byte[], PeerRecord> formerClients = CacheBuilder.newBuilder().
+       expireAfterAccess(PEER_CACHE_SECONDS, TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
+
+       private interface SessionReference extends AutoCloseable {
+               Short getSessionId();
+       }
 
        /**
         * Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator.
@@ -65,7 +93,7 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                LOG.debug("Instantiating bootstrap negotiator for channel {}", channel);
                return new AbstractSessionNegotiator<Message, PCEPSessionImpl>(promise, channel) {
                        @Override
-                       protected void startNegotiation() throws Exception {
+                       protected void startNegotiation() throws ExecutionException {
                                LOG.debug("Bootstrap negotiation for channel {} started", this.channel);
 
                                /*
@@ -75,15 +103,14 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                                final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress();
 
                                synchronized (lock) {
-
                                        if (AbstractPCEPSessionNegotiatorFactory.this.sessions.containsKey(clientAddress)) {
                                                final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress();
                                                if (COMPARATOR.compare(serverAddress, clientAddress) > 0) {
-                                                       final Closeable n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress);
+                                                       final SessionReference n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress);
                                                        try {
                                                                n.close();
-                                                       } catch (final IOException e) {
-                                                               LOG.warn("Unexpected failure to close old session", e);
+                                                       } catch (final Exception e) {
+                                                               LOG.error("Unexpected failure to close old session", e);
                                                        }
                                                } else {
                                                        negotiationFailed(new RuntimeException("A conflicting session for address "
@@ -92,13 +119,27 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                                                }
                                        }
 
-                                       final short sessionId = nextSession(clientAddress);
+                                       final Short sessionId = nextSession(clientAddress);
                                        final AbstractPCEPSessionNegotiator n = createNegotiator(promise, factory.getSessionListener(), this.channel, sessionId);
 
-                                       AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new Closeable() {
+                                       AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new SessionReference() {
+                                               @Override
+                                               public void close() throws ExecutionException {
+                                                       try {
+                                                               formerClients.get(clientAddress, new Callable<PeerRecord>() {
+                                                                       @Override
+                                                                       public PeerRecord call() {
+                                                                               return new PeerRecord(ID_CACHE_SECONDS, getSessionId());
+                                                                       }
+                                                               });
+                                                       } finally {
+                                                               channel.close();
+                                                       }
+                                               }
+
                                                @Override
-                                               public void close() {
-                                                       channel.close();
+                                               public Short getSessionId() {
+                                                       return sessionId;
                                                }
                                        });
 
@@ -111,7 +152,7 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
                                                }
                                        });
 
-                                       LOG.debug("Replacing bootstrap negotiator for channel {}", this.channel);
+                                       LOG.info("Replacing bootstrap negotiator for channel {}", this.channel);
                                        this.channel.pipeline().replace(this, "negotiator", n);
                                        n.startNegotiation();
                                }
@@ -125,18 +166,14 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
        }
 
        @GuardedBy("this")
-       private short nextSession(final byte[] clientAddress) {
-               /*
-                * FIXME: BUG-197: Improve the allocation algorithm to make sure:
-                * - no duplicate IDs are assigned
-                * - we retain former session IDs for a reasonable time
-                */
-               Short next = this.sessionIds.get(clientAddress);
-               if (next == null) {
-                       next = 0;
-               }
-
-               this.sessionIds.put(clientAddress, (short) ((next + 1) % 255));
-               return next;
+       private Short nextSession(final byte[] clientAddress) throws ExecutionException {
+               final PeerRecord peer = formerClients.get(clientAddress, new Callable<PeerRecord>() {
+                       @Override
+                       public PeerRecord call() throws Exception {
+                               return new PeerRecord(ID_CACHE_SECONDS, null);
+                       }
+               });
+
+               return peer.allocId();
        }
 }
index 4302612e30876524bad0a1081c08f98be7972ae2..1c21ea5cdce0d5a2ffd996f27b113ab7c988cc65 100644 (file)
@@ -133,7 +133,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
                        }, getKeepAliveTimerValue(), TimeUnit.SECONDS);
                }
 
-               LOG.debug("Session started.");
+               LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(), remoteOpen.getSessionId());
        }
 
        /**
@@ -250,6 +250,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
        }
 
        private synchronized void terminate(final TerminationReason reason) {
+               LOG.info("Local session termination : {}", reason);
                this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
                this.closed = true;
                this.sendMessage(new CloseBuilder().setCCloseMessage(
@@ -378,6 +379,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements
        }
 
        protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+               toStringHelper.add("channel", this.channel);
                toStringHelper.add("localOpen", this.localOpen);
                toStringHelper.add("remoteOpen", this.remoteOpen);
                return toStringHelper;
diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PeerRecord.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PeerRecord.java
new file mode 100644 (file)
index 0000000..e3a58c8
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.pcep.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+@ThreadSafe
+final class PeerRecord {
+       @GuardedBy("this")
+       private final Cache<Short, Short> pastIds;
+
+       @GuardedBy("this")
+       private Short lastId;
+
+       PeerRecord(final long idLifetimeSeconds, final Short lastId) {
+               // Note that the cache is limited to 255 entries -- which means we will always have
+               // a single entry available. That number will be the Last Recently Used ID.
+               pastIds = CacheBuilder.newBuilder().expireAfterWrite(idLifetimeSeconds, TimeUnit.SECONDS).maximumSize(255).build();
+               this.lastId = lastId;
+       }
+
+       synchronized Short allocId() {
+               Short id = lastId == null ? 0 : lastId;
+
+               while (pastIds.getIfPresent(id) != null) {
+                       id = (short) ((id + 1) % 255);
+               }
+
+               pastIds.put(id, id);
+               lastId = id;
+               return id;
+       }
+}
index 7beb739e6ded5b95fb13d5288f56451c96f2967c..f682c5589d43b5a427efaac885c650fc56a9c728 100644 (file)
@@ -22,6 +22,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
+import java.net.SocketAddress;
 import java.util.Arrays;
 import java.util.List;
 
@@ -64,6 +65,9 @@ public class FiniteStateMachineTest {
        @Mock
        private ChannelPipeline pipeline;
 
+       @Mock
+       private SocketAddress address;
+
        private final List<Notification> receivedMsgs = Lists.newArrayList();
 
        private Open openmsg;
@@ -87,6 +91,8 @@ public class FiniteStateMachineTest {
                }).when(this.clientListener).writeAndFlush(any(Notification.class));
                doReturn("TestingChannel").when(this.clientListener).toString();
                doReturn(this.pipeline).when(this.clientListener).pipeline();
+               doReturn(this.address).when(this.clientListener).localAddress();
+               doReturn(this.address).when(this.clientListener).remoteAddress();
                doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class));
                doReturn(true).when(this.clientListener).isActive();
                doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
@@ -143,9 +149,9 @@ public class FiniteStateMachineTest {
                                                                new SessionBuilder().setOpen(
                                                                                new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.OpenBuilder().setKeepalive(
                                                                                                (short) 1).build()).build()).build()).setErrors(
-                                               Arrays.asList(new ErrorsBuilder().setErrorObject(
-                                                               new ErrorObjectBuilder().setType(maping.getFromErrorsEnum(e).type).setValue(
-                                                                               maping.getFromErrorsEnum(e).value).build()).build())).build()).build();
+                                                                                                               Arrays.asList(new ErrorsBuilder().setErrorObject(
+                                                                                                                               new ErrorObjectBuilder().setType(maping.getFromErrorsEnum(e).type).setValue(
+                                                                                                                                               maping.getFromErrorsEnum(e).value).build()).build())).build()).build();
        }
 
        /**
index 5300b79d372d5d2fe82edf3dbbbc95c8580af9a3..d7d64b12eb84a3e5b5dda0a4ef901698cffdb098 100644 (file)
@@ -111,8 +111,11 @@ public class ParserToSalTest {
                doReturn(this.pipeline).when(this.clientListener).pipeline();
                doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class));
                doReturn(true).when(this.clientListener).isActive();
-               final SocketAddress sa = new InetSocketAddress("127.0.0.1", 4189);
-               doReturn(sa).when(this.clientListener).remoteAddress();
+               final SocketAddress ra = new InetSocketAddress("127.0.0.1", 4189);
+               doReturn(ra).when(this.clientListener).remoteAddress();
+               final SocketAddress la = new InetSocketAddress("127.0.0.1", 30000);
+               doReturn(la).when(this.clientListener).localAddress();
+
                doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
 
                Mockito.doReturn(this.mockedTransaction).when(this.providerService).beginTransaction();