Moved duplicate session handling into AbstractSessionNegtiatorFactory (PCEPPeerRegistry),
to be usable for all session negotiators created by. PCEPPeerRegistry also handle session-id caching.
Session ref. entries stored in bi-map were identifed by byte array (raw IP Address of client),
casuing that already existing session in bi-map were not look-up properly => allowing to create duplicate sessions.
Changed type of bi-map's key to wrapper of byte array.
Fixed also removing of session refs. from map on channel close. Turned bi-map to map, since inverse map is not used anymore.
pcc-mock is reusing this code - need to create session negotiator factory per pcc.
Change-Id: I85670b083b6ea832f8b9a4891c812845174f03ff
Signed-off-by: Milos Fabian <milfabia@cisco.com>
private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class);
+ private PCEPPeerRegistry sessionRegistry = new PCEPPeerRegistry();
+
/**
* Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator.
*
return new PCEPSessionNegotiator(channel, promise, factory, this);
}
+ public PCEPPeerRegistry getSessionRegistry() {
+ return this.sessionRegistry;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.pcep.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+@ThreadSafe
+final class PCEPPeerRegistry {
+
+ /**
+ * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse it.
+ */
+ private static final long ID_CACHE_SECONDS = 3 * 3600;
+
+ /**
+ * The total amount of time we should remember a peer having been present, unless some other pressure forces us to
+ * forget about it due to {@link PEER_CACHE_SIZE}.
+ */
+ private static final long PEER_CACHE_SECONDS = 24 * 3600;
+
+ /**
+ * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines how many
+ * peers we can see turn around.
+ */
+ private static final long PEER_CACHE_SIZE = 1024;
+
+ @GuardedBy("this")
+ private final Cache<ByteArrayWrapper, PeerRecord> formerClients = CacheBuilder.newBuilder().expireAfterAccess(PEER_CACHE_SECONDS,
+ TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
+
+ @GuardedBy("this")
+ private final Map<ByteArrayWrapper, SessionReference> sessions = new HashMap<>();
+
+ protected interface SessionReference extends AutoCloseable {
+ Short getSessionId();
+ }
+
+
+ protected synchronized Optional<SessionReference> getSessionReference(final byte[] clientAddress) {
+ final SessionReference sessionReference = this.sessions.get(new ByteArrayWrapper(clientAddress));
+ if (sessionReference != null) {
+ return Optional.of(sessionReference);
+ }
+ return Optional.absent();
+ }
+
+ protected synchronized Optional<SessionReference> removeSessionReference(final byte[] clientAddress) {
+ final SessionReference sessionReference = this.sessions.remove(new ByteArrayWrapper(clientAddress));
+ if (sessionReference != null) {
+ return Optional.of(sessionReference);
+ }
+ return Optional.absent();
+ }
+
+ protected synchronized void putSessionReference(final byte[] clientAddress, final SessionReference sessionReference) {
+ this.sessions.put(new ByteArrayWrapper(clientAddress), sessionReference);
+ }
+
+ protected synchronized Short nextSession(final byte[] clientAddress) throws ExecutionException {
+ final PeerRecord peer = this.formerClients.get(new ByteArrayWrapper(clientAddress), new Callable<PeerRecord>() {
+ @Override
+ public PeerRecord call() {
+ return new PeerRecord(ID_CACHE_SECONDS, null);
+ }
+ });
+
+ return peer.allocId();
+ }
+
+ protected synchronized void releaseSession(final byte[] clientAddress, final short sessionId) throws ExecutionException {
+ this.formerClients.get(new ByteArrayWrapper(clientAddress), new Callable<PeerRecord>() {
+ @Override
+ public PeerRecord call() {
+ return new PeerRecord(ID_CACHE_SECONDS, sessionId);
+ }
+ });
+ }
+
+ private static final class ByteArrayWrapper {
+
+ private final byte[] byteArray;
+
+ public ByteArrayWrapper(final byte[] byteArray) {
+ this.byteArray = byteArray;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(byteArray);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ByteArrayWrapper)) {
+ return false;
+ }
+ return Arrays.equals(byteArray, ((ByteArrayWrapper) obj).byteArray);
+ }
+ }
+}
*/
package org.opendaylight.protocol.pcep.impl;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
+import com.google.common.base.Optional;
import com.google.common.primitives.UnsignedBytes;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.Comparator;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
+import org.opendaylight.protocol.pcep.impl.PCEPPeerRegistry.SessionReference;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
- /**
- * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse it.
- */
- private static final long ID_CACHE_SECONDS = 3 * 3600;
-
- /**
- * The total amount of time we should remember a peer having been present, unless some other pressure forces us to
- * forget about it due to {@link PEER_CACHE_SIZE}.
- */
- private static final long PEER_CACHE_SECONDS = 24 * 3600;
-
- /**
- * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines how many
- * peers we can see turn around.
- */
- private static final long PEER_CACHE_SIZE = 1024;
-
- @GuardedBy("this")
- private final Cache<byte[], PeerRecord> formerClients = CacheBuilder.newBuilder().expireAfterAccess(PEER_CACHE_SECONDS,
- TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
-
private final Channel channel;
private final Promise<PCEPSessionImpl> promise;
private final AbstractPCEPSessionNegotiatorFactory negFactory;
- @GuardedBy("this")
- private final BiMap<byte[], SessionReference> sessions = HashBiMap.create();
-
- private interface SessionReference extends AutoCloseable {
- Short getSessionId();
- }
-
public PCEPSessionNegotiator(final Channel channel, final Promise<PCEPSessionImpl> promise, final SessionListenerFactory<PCEPSessionListener> factory,
final AbstractPCEPSessionNegotiatorFactory negFactory) {
super(promise, channel);
* registered for this client.
*/
final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress();
+ final PCEPPeerRegistry sessionReg = this.negFactory.getSessionRegistry();
synchronized (lock) {
- if (this.sessions.containsKey(clientAddress)) {
+ if (sessionReg.getSessionReference(clientAddress).isPresent()) {
final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress();
if (COMPARATOR.compare(serverAddress, clientAddress) > 0) {
- final SessionReference n = this.sessions.remove(clientAddress);
+ final Optional<SessionReference> sessionRefMaybe = sessionReg.removeSessionReference(clientAddress);
try {
- n.close();
+ if (sessionRefMaybe.isPresent()) {
+ sessionRefMaybe.get().close();
+ }
} catch (final Exception e) {
LOG.error("Unexpected failure to close old session", e);
}
}
}
- final Short sessionId = nextSession(clientAddress);
+ final Short sessionId = sessionReg.nextSession(clientAddress);
final AbstractPCEPSessionNegotiator n = this.negFactory.createNegotiator(this.promise, this.factory.getSessionListener(), this.channel, sessionId);
- this.sessions.put(clientAddress, new SessionReference() {
+ sessionReg.putSessionReference(clientAddress, new SessionReference() {
@Override
public void close() throws ExecutionException {
try {
- PCEPSessionNegotiator.this.formerClients.get(clientAddress, new Callable<PeerRecord>() {
- @Override
- public PeerRecord call() {
- return new PeerRecord(ID_CACHE_SECONDS, getSessionId());
- }
- });
+ sessionReg.releaseSession(clientAddress, sessionId);
} finally {
PCEPSessionNegotiator.this.channel.close();
}
@Override
public void operationComplete(final ChannelFuture future) {
synchronized (lock) {
- PCEPSessionNegotiator.this.sessions.inverse().remove(this);
+ sessionReg.removeSessionReference(clientAddress);
}
}
});
}
}
- @GuardedBy("this")
- protected Short nextSession(final byte[] clientAddress) throws ExecutionException {
- final PeerRecord peer = this.formerClients.get(clientAddress, new Callable<PeerRecord>() {
- @Override
- public PeerRecord call() {
- return new PeerRecord(ID_CACHE_SECONDS, null);
- }
- });
-
- return peer.allocId();
- }
-
@Override
protected void handleMessage(final Message msg) {
throw new IllegalStateException("Bootstrap negotiator should have been replaced");
public void setUp() {
final Open open = new OpenBuilder().setSessionId((short) 0).setDeadTimer(DEAD_TIMER).setKeepalive(KEEP_ALIVE)
.build();
- final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(
- open, 0);
final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final MessageRegistry msgReg = ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
.getMessageHandlerRegistry();
- this.dispatcher = new PCEPDispatcherImpl(msgReg, snf, eventLoopGroup, eventLoopGroup);
- this.pccMock = new PCCMock<>(snf, new PCEPHandlerFactory(msgReg), new DefaultPromise<PCEPSessionImpl>(
- GlobalEventExecutor.INSTANCE));
+ this.dispatcher = new PCEPDispatcherImpl(msgReg, new DefaultPCEPSessionNegotiatorFactory(open, 0),
+ eventLoopGroup, eventLoopGroup);
+ this.pccMock = new PCCMock<>(new DefaultPCEPSessionNegotiatorFactory(open, 0),
+ new PCEPHandlerFactory(msgReg), new DefaultPromise<PCEPSessionImpl>(
+ GlobalEventExecutor.INSTANCE));
}
@Test
return new SimpleSessionListener();
}
});
- final PCEPSessionImpl session1 = pccMock.createClient(CLIENT1_ADDRESS,
+ final PCEPSessionImpl session1 = this.pccMock.createClient(CLIENT1_ADDRESS,
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
new SessionListenerFactory<PCEPSessionListener>() {
@Override
}
}).get();
- final PCEPSessionImpl session2 = pccMock.createClient(CLIENT2_ADDRESS,
+ final PCEPSessionImpl session2 = this.pccMock.createClient(CLIENT2_ADDRESS,
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
new SessionListenerFactory<PCEPSessionListener>() {
@Override
session1.close();
session2.close();
Assert.assertTrue(futureChannel.channel().isActive());
+ }
+
+ @Test
+ public void testCreateDuplicateClient() throws InterruptedException, ExecutionException {
+ this.dispatcher.createServer(new InetSocketAddress("0.0.0.0", PORT),
+ new SessionListenerFactory<PCEPSessionListener>() {
+ @Override
+ public PCEPSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ });
+ final PCEPSessionImpl session1 = this.pccMock.createClient(CLIENT1_ADDRESS,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+ new SessionListenerFactory<PCEPSessionListener>() {
+ @Override
+ public PCEPSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get();
+
+ try {
+ this.pccMock.createClient(CLIENT1_ADDRESS,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+ new SessionListenerFactory<PCEPSessionListener>() {
+ @Override
+ public PCEPSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get();
+ Assert.fail();
+ } catch(ExecutionException e) {
+ Assert.assertTrue(e.getMessage().contains("A conflicting session for address"));
+ } finally {
+ session1.close();
+ }
+ }
+
+ @Test
+ public void testReconectClient() throws InterruptedException, ExecutionException {
+ this.dispatcher.createServer(new InetSocketAddress("0.0.0.0", PORT),
+ new SessionListenerFactory<PCEPSessionListener>() {
+ @Override
+ public PCEPSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ });
+ final PCEPSessionImpl session1 = this.pccMock.createClient(CLIENT1_ADDRESS,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+ new SessionListenerFactory<PCEPSessionListener>() {
+ @Override
+ public PCEPSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get();
+
+ Assert.assertEquals(CLIENT1_ADDRESS.getAddress(), session1.getRemoteAddress());
+ Assert.assertEquals(DEAD_TIMER, session1.getDeadTimerValue().shortValue());
+ Assert.assertEquals(KEEP_ALIVE, session1.getKeepAliveTimerValue().shortValue());
+ session1.close();
- futureChannel.channel().close();
+ final PCEPSessionImpl session2 = this.pccMock.createClient(CLIENT1_ADDRESS,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+ new SessionListenerFactory<PCEPSessionListener>() {
+ @Override
+ public PCEPSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get();
+
+ Assert.assertEquals(CLIENT1_ADDRESS.getAddress(), session1.getRemoteAddress());
+ Assert.assertEquals(DEAD_TIMER, session2.getDeadTimerValue().shortValue());
+ Assert.assertEquals(KEEP_ALIVE, session2.getKeepAliveTimerValue().shortValue());
+
+ session2.close();
}
@After
public static void createPCCs(final int lspsPerPcc, final boolean pcerr, final int pccCount,
final InetAddress localAddress, final List<InetAddress> remoteAddress, final short keepalive, final short deadtimer) throws InterruptedException, ExecutionException {
- final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(
- new OpenBuilder().setKeepalive(keepalive).setDeadTimer(deadtimer).setSessionId((short) 0).build(), 0);
-
final StatefulActivator activator07 = new StatefulActivator();
activator07.start(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance());
- final PCCMock<Message, PCEPSessionImpl, PCEPSessionListener> pcc = new PCCMock<>(snf, new PCEPHandlerFactory(
- ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry()));
-
for (final InetAddress pceAddress : remoteAddress) {
InetAddress currentAddress = localAddress;
int i = 0;
while (i < pccCount) {
final InetAddress pccAddress = currentAddress;
+ final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(
+ new OpenBuilder().setKeepalive(keepalive).setDeadTimer(deadtimer).setSessionId((short) 0).build(), 0);
+
+ final PCCMock<Message, PCEPSessionImpl, PCEPSessionListener> pcc = new PCCMock<Message, PCEPSessionImpl, PCEPSessionListener>(snf, new PCEPHandlerFactory(
+ ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry()));
pcc.createClient(new InetSocketAddress(pccAddress, 0), new InetSocketAddress(pceAddress, DEFAULT_PORT),
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, RECONNECT_STRATEGY_TIMEOUT),
new SessionListenerFactory<PCEPSessionListener>() {
org.opendaylight.protocol.pcep.testtool.Main.main(new String[]{"-a", "127.0.4.0:4189", "-ka", "10", "-d", "0", "--stateful", "--active"});
org.opendaylight.protocol.pcep.testtool.Main.main(new String[]{"-a", "127.0.2.0:4189", "-ka", "10", "-d", "0", "--stateful", "--active"});
org.opendaylight.protocol.pcep.testtool.Main.main(new String[]{"-a", "127.0.3.0:4189", "-ka", "10", "-d", "0", "--stateful", "--active"});
- Main.main(new String[] {"--local-address", "127.0.0.1", "--remote-address", "127.0.4.0,127.0.2.0,127.0.3.0"});
+ Main.main(new String[] {"--local-address", "127.0.0.1", "--remote-address", "127.0.4.0,127.0.2.0,127.0.3.0", "--pcc", "3"});
} catch (Exception e) {
Assert.fail(e.getMessage());
}