import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.Promise;
-import java.io.Closeable;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Comparator;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.primitives.UnsignedBytes;
* further subclassed to provide either a client or server factory.
*/
public abstract class AbstractPCEPSessionNegotiatorFactory implements
- SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> {
+SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> {
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class);
- private final BiMap<byte[], Closeable> sessions = HashBiMap.create();
- private final Map<byte[], Short> sessionIds = new WeakHashMap<>();
+
+ /**
+ * The total amount of time we should remember a peer having been present, unless some other pressure
+ * forces us to forget about it due to {@link PEER_CACHE_SIZE}.
+ */
+ private static final long PEER_CACHE_SECONDS = 24 * 3600;
+
+ /**
+ * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines
+ * how many peers we can see turn around.
+ */
+ private static final long PEER_CACHE_SIZE = 1024;
+
+ /**
+ * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse
+ * it.
+ */
+ private static final long ID_CACHE_SECONDS = 3*3600;
+
+ @GuardedBy("this")
+ private final BiMap<byte[], SessionReference> sessions = HashBiMap.create();
+
+ @GuardedBy("this")
+ private final Cache<byte[], PeerRecord> formerClients = CacheBuilder.newBuilder().
+ expireAfterAccess(PEER_CACHE_SECONDS, TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
+
+ private interface SessionReference extends AutoCloseable {
+ Short getSessionId();
+ }
/**
* Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator.
LOG.debug("Instantiating bootstrap negotiator for channel {}", channel);
return new AbstractSessionNegotiator<Message, PCEPSessionImpl>(promise, channel) {
@Override
- protected void startNegotiation() throws Exception {
+ protected void startNegotiation() throws ExecutionException {
LOG.debug("Bootstrap negotiation for channel {} started", this.channel);
/*
final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress();
synchronized (lock) {
-
if (AbstractPCEPSessionNegotiatorFactory.this.sessions.containsKey(clientAddress)) {
final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress();
if (COMPARATOR.compare(serverAddress, clientAddress) > 0) {
- final Closeable n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress);
+ final SessionReference n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress);
try {
n.close();
- } catch (final IOException e) {
- LOG.warn("Unexpected failure to close old session", e);
+ } catch (final Exception e) {
+ LOG.error("Unexpected failure to close old session", e);
}
} else {
negotiationFailed(new RuntimeException("A conflicting session for address "
}
}
- final short sessionId = nextSession(clientAddress);
+ final Short sessionId = nextSession(clientAddress);
final AbstractPCEPSessionNegotiator n = createNegotiator(promise, factory.getSessionListener(), this.channel, sessionId);
- AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new Closeable() {
+ AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new SessionReference() {
+ @Override
+ public void close() throws ExecutionException {
+ try {
+ formerClients.get(clientAddress, new Callable<PeerRecord>() {
+ @Override
+ public PeerRecord call() {
+ return new PeerRecord(ID_CACHE_SECONDS, getSessionId());
+ }
+ });
+ } finally {
+ channel.close();
+ }
+ }
+
@Override
- public void close() {
- channel.close();
+ public Short getSessionId() {
+ return sessionId;
}
});
}
});
- LOG.debug("Replacing bootstrap negotiator for channel {}", this.channel);
+ LOG.info("Replacing bootstrap negotiator for channel {}", this.channel);
this.channel.pipeline().replace(this, "negotiator", n);
n.startNegotiation();
}
}
@GuardedBy("this")
- private short nextSession(final byte[] clientAddress) {
- /*
- * FIXME: BUG-197: Improve the allocation algorithm to make sure:
- * - no duplicate IDs are assigned
- * - we retain former session IDs for a reasonable time
- */
- Short next = this.sessionIds.get(clientAddress);
- if (next == null) {
- next = 0;
- }
-
- this.sessionIds.put(clientAddress, (short) ((next + 1) % 255));
- return next;
+ private Short nextSession(final byte[] clientAddress) throws ExecutionException {
+ final PeerRecord peer = formerClients.get(clientAddress, new Callable<PeerRecord>() {
+ @Override
+ public PeerRecord call() throws Exception {
+ return new PeerRecord(ID_CACHE_SECONDS, null);
+ }
+ });
+
+ return peer.allocId();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.pcep.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+@ThreadSafe
+final class PeerRecord {
+ @GuardedBy("this")
+ private final Cache<Short, Short> pastIds;
+
+ @GuardedBy("this")
+ private Short lastId;
+
+ PeerRecord(final long idLifetimeSeconds, final Short lastId) {
+ // Note that the cache is limited to 255 entries -- which means we will always have
+ // a single entry available. That number will be the Last Recently Used ID.
+ pastIds = CacheBuilder.newBuilder().expireAfterWrite(idLifetimeSeconds, TimeUnit.SECONDS).maximumSize(255).build();
+ this.lastId = lastId;
+ }
+
+ synchronized Short allocId() {
+ Short id = lastId == null ? 0 : lastId;
+
+ while (pastIds.getIfPresent(id) != null) {
+ id = (short) ((id + 1) % 255);
+ }
+
+ pastIds.put(id, id);
+ lastId = id;
+ return id;
+ }
+}
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
@Mock
private ChannelPipeline pipeline;
+ @Mock
+ private SocketAddress address;
+
private final List<Notification> receivedMsgs = Lists.newArrayList();
private Open openmsg;
}).when(this.clientListener).writeAndFlush(any(Notification.class));
doReturn("TestingChannel").when(this.clientListener).toString();
doReturn(this.pipeline).when(this.clientListener).pipeline();
+ doReturn(this.address).when(this.clientListener).localAddress();
+ doReturn(this.address).when(this.clientListener).remoteAddress();
doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class));
doReturn(true).when(this.clientListener).isActive();
doReturn(mock(ChannelFuture.class)).when(this.clientListener).close();
new SessionBuilder().setOpen(
new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.OpenBuilder().setKeepalive(
(short) 1).build()).build()).build()).setErrors(
- Arrays.asList(new ErrorsBuilder().setErrorObject(
- new ErrorObjectBuilder().setType(maping.getFromErrorsEnum(e).type).setValue(
- maping.getFromErrorsEnum(e).value).build()).build())).build()).build();
+ Arrays.asList(new ErrorsBuilder().setErrorObject(
+ new ErrorObjectBuilder().setType(maping.getFromErrorsEnum(e).type).setValue(
+ maping.getFromErrorsEnum(e).value).build()).build())).build()).build();
}
/**