From c89042789100092af06b263b959bbb67913678cd Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 5 Dec 2013 22:22:57 +0100 Subject: [PATCH] BUG-197: improve session ID tracking Change-Id: I45c4af7ec581b5d17dffea27f4e51219a26105f7 Signed-off-by: Robert Varga --- .../impl/AbstractPCEPSessionNegotiator.java | 1 + .../AbstractPCEPSessionNegotiatorFactory.java | 97 +++++++++++++------ .../protocol/pcep/impl/PCEPSessionImpl.java | 4 +- .../protocol/pcep/impl/PeerRecord.java | 44 +++++++++ .../pcep/impl/FiniteStateMachineTest.java | 12 ++- .../topology/provider/ParserToSalTest.java | 7 +- 6 files changed, 129 insertions(+), 36 deletions(-) create mode 100644 pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PeerRecord.java diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java index 50e1112d0d..06ffa43dd0 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java @@ -210,6 +210,7 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot if (msg instanceof Keepalive) { this.localOK = true; if (this.remoteOK) { + LOG.info("Channel {} completed negotiation", this.channel); negotiationSuccessful(createSession(this.timer, this.channel, this.localPrefs, this.remotePrefs)); this.state = State.Finished; } else { diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java index 24eccb3a90..3b4933299b 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java @@ -12,12 +12,11 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.util.concurrent.Promise; -import java.io.Closeable; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.Comparator; -import java.util.Map; -import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; @@ -30,6 +29,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.primitives.UnsignedBytes; @@ -39,11 +40,38 @@ import com.google.common.primitives.UnsignedBytes; * further subclassed to provide either a client or server factory. */ public abstract class AbstractPCEPSessionNegotiatorFactory implements - SessionNegotiatorFactory { +SessionNegotiatorFactory { private static final Comparator COMPARATOR = UnsignedBytes.lexicographicalComparator(); private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class); - private final BiMap sessions = HashBiMap.create(); - private final Map sessionIds = new WeakHashMap<>(); + + /** + * The total amount of time we should remember a peer having been present, unless some other pressure + * forces us to forget about it due to {@link PEER_CACHE_SIZE}. + */ + private static final long PEER_CACHE_SECONDS = 24 * 3600; + + /** + * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines + * how many peers we can see turn around. + */ + private static final long PEER_CACHE_SIZE = 1024; + + /** + * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse + * it. + */ + private static final long ID_CACHE_SECONDS = 3*3600; + + @GuardedBy("this") + private final BiMap sessions = HashBiMap.create(); + + @GuardedBy("this") + private final Cache formerClients = CacheBuilder.newBuilder(). + expireAfterAccess(PEER_CACHE_SECONDS, TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build(); + + private interface SessionReference extends AutoCloseable { + Short getSessionId(); + } /** * Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator. @@ -65,7 +93,7 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements LOG.debug("Instantiating bootstrap negotiator for channel {}", channel); return new AbstractSessionNegotiator(promise, channel) { @Override - protected void startNegotiation() throws Exception { + protected void startNegotiation() throws ExecutionException { LOG.debug("Bootstrap negotiation for channel {} started", this.channel); /* @@ -75,15 +103,14 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress(); synchronized (lock) { - if (AbstractPCEPSessionNegotiatorFactory.this.sessions.containsKey(clientAddress)) { final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress(); if (COMPARATOR.compare(serverAddress, clientAddress) > 0) { - final Closeable n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress); + final SessionReference n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress); try { n.close(); - } catch (final IOException e) { - LOG.warn("Unexpected failure to close old session", e); + } catch (final Exception e) { + LOG.error("Unexpected failure to close old session", e); } } else { negotiationFailed(new RuntimeException("A conflicting session for address " @@ -92,13 +119,27 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements } } - final short sessionId = nextSession(clientAddress); + final Short sessionId = nextSession(clientAddress); final AbstractPCEPSessionNegotiator n = createNegotiator(promise, factory.getSessionListener(), this.channel, sessionId); - AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new Closeable() { + AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new SessionReference() { + @Override + public void close() throws ExecutionException { + try { + formerClients.get(clientAddress, new Callable() { + @Override + public PeerRecord call() { + return new PeerRecord(ID_CACHE_SECONDS, getSessionId()); + } + }); + } finally { + channel.close(); + } + } + @Override - public void close() { - channel.close(); + public Short getSessionId() { + return sessionId; } }); @@ -111,7 +152,7 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements } }); - LOG.debug("Replacing bootstrap negotiator for channel {}", this.channel); + LOG.info("Replacing bootstrap negotiator for channel {}", this.channel); this.channel.pipeline().replace(this, "negotiator", n); n.startNegotiation(); } @@ -125,18 +166,14 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements } @GuardedBy("this") - private short nextSession(final byte[] clientAddress) { - /* - * FIXME: BUG-197: Improve the allocation algorithm to make sure: - * - no duplicate IDs are assigned - * - we retain former session IDs for a reasonable time - */ - Short next = this.sessionIds.get(clientAddress); - if (next == null) { - next = 0; - } - - this.sessionIds.put(clientAddress, (short) ((next + 1) % 255)); - return next; + private Short nextSession(final byte[] clientAddress) throws ExecutionException { + final PeerRecord peer = formerClients.get(clientAddress, new Callable() { + @Override + public PeerRecord call() throws Exception { + return new PeerRecord(ID_CACHE_SECONDS, null); + } + }); + + return peer.allocId(); } } diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java index 4302612e30..1c21ea5cdc 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java @@ -133,7 +133,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements }, getKeepAliveTimerValue(), TimeUnit.SECONDS); } - LOG.debug("Session started."); + LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(), remoteOpen.getSessionId()); } /** @@ -250,6 +250,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements } private synchronized void terminate(final TerminationReason reason) { + LOG.info("Local session termination : {}", reason); this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason)); this.closed = true; this.sendMessage(new CloseBuilder().setCCloseMessage( @@ -378,6 +379,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements } protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + toStringHelper.add("channel", this.channel); toStringHelper.add("localOpen", this.localOpen); toStringHelper.add("remoteOpen", this.remoteOpen); return toStringHelper; diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PeerRecord.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PeerRecord.java new file mode 100644 index 0000000000..e3a58c8245 --- /dev/null +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PeerRecord.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.pcep.impl; + +import java.util.concurrent.TimeUnit; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +@ThreadSafe +final class PeerRecord { + @GuardedBy("this") + private final Cache pastIds; + + @GuardedBy("this") + private Short lastId; + + PeerRecord(final long idLifetimeSeconds, final Short lastId) { + // Note that the cache is limited to 255 entries -- which means we will always have + // a single entry available. That number will be the Last Recently Used ID. + pastIds = CacheBuilder.newBuilder().expireAfterWrite(idLifetimeSeconds, TimeUnit.SECONDS).maximumSize(255).build(); + this.lastId = lastId; + } + + synchronized Short allocId() { + Short id = lastId == null ? 0 : lastId; + + while (pastIds.getIfPresent(id) != null) { + id = (short) ((id + 1) % 255); + } + + pastIds.put(id, id); + lastId = id; + return id; + } +} diff --git a/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java b/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java index 7beb739e6d..f682c5589d 100644 --- a/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java +++ b/pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/FiniteStateMachineTest.java @@ -22,6 +22,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.GlobalEventExecutor; +import java.net.SocketAddress; import java.util.Arrays; import java.util.List; @@ -64,6 +65,9 @@ public class FiniteStateMachineTest { @Mock private ChannelPipeline pipeline; + @Mock + private SocketAddress address; + private final List receivedMsgs = Lists.newArrayList(); private Open openmsg; @@ -87,6 +91,8 @@ public class FiniteStateMachineTest { }).when(this.clientListener).writeAndFlush(any(Notification.class)); doReturn("TestingChannel").when(this.clientListener).toString(); doReturn(this.pipeline).when(this.clientListener).pipeline(); + doReturn(this.address).when(this.clientListener).localAddress(); + doReturn(this.address).when(this.clientListener).remoteAddress(); doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class)); doReturn(true).when(this.clientListener).isActive(); doReturn(mock(ChannelFuture.class)).when(this.clientListener).close(); @@ -143,9 +149,9 @@ public class FiniteStateMachineTest { new SessionBuilder().setOpen( new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.OpenBuilder().setKeepalive( (short) 1).build()).build()).build()).setErrors( - Arrays.asList(new ErrorsBuilder().setErrorObject( - new ErrorObjectBuilder().setType(maping.getFromErrorsEnum(e).type).setValue( - maping.getFromErrorsEnum(e).value).build()).build())).build()).build(); + Arrays.asList(new ErrorsBuilder().setErrorObject( + new ErrorObjectBuilder().setType(maping.getFromErrorsEnum(e).type).setValue( + maping.getFromErrorsEnum(e).value).build()).build())).build()).build(); } /** diff --git a/pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/ParserToSalTest.java b/pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/ParserToSalTest.java index 5300b79d37..d7d64b12eb 100644 --- a/pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/ParserToSalTest.java +++ b/pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/ParserToSalTest.java @@ -111,8 +111,11 @@ public class ParserToSalTest { doReturn(this.pipeline).when(this.clientListener).pipeline(); doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class)); doReturn(true).when(this.clientListener).isActive(); - final SocketAddress sa = new InetSocketAddress("127.0.0.1", 4189); - doReturn(sa).when(this.clientListener).remoteAddress(); + final SocketAddress ra = new InetSocketAddress("127.0.0.1", 4189); + doReturn(ra).when(this.clientListener).remoteAddress(); + final SocketAddress la = new InetSocketAddress("127.0.0.1", 30000); + doReturn(la).when(this.clientListener).localAddress(); + doReturn(mock(ChannelFuture.class)).when(this.clientListener).close(); Mockito.doReturn(this.mockedTransaction).when(this.providerService).beginTransaction(); -- 2.36.6