Disable to create duplicate (with same IP Address) PCEP sessions. 64/14964/4
authorMilos Fabian <milfabia@cisco.com>
Wed, 10 Dec 2014 15:21:54 +0000 (16:21 +0100)
committerMilos Fabian <milfabia@cisco.com>
Tue, 10 Feb 2015 16:34:02 +0000 (17:34 +0100)
Moved duplicate session handling into AbstractSessionNegtiatorFactory (PCEPPeerRegistry),
to be usable for all session negotiators created by. PCEPPeerRegistry also handle session-id caching.

Session ref. entries stored in bi-map were identifed by byte array (raw IP Address of client),
casuing that already existing session in bi-map were not look-up properly => allowing to create duplicate sessions.
Changed type of bi-map's key to wrapper of byte array.

Fixed also removing of session refs. from map on channel close. Turned bi-map to map, since inverse map is not used anymore.

pcc-mock is reusing this code - need to create session negotiator factory per pcc.

Change-Id: I85670b083b6ea832f8b9a4891c812845174f03ff
Signed-off-by: Milos Fabian <milfabia@cisco.com>
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPPeerRegistry.java [new file with mode: 0644]
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionNegotiator.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImplTest.java
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/Main.java
pcep/pcc-mock/src/test/java/org/opendaylight/protocol/pcep/pcc/mock/PCCMockTest.java

index b288df37729df5b55a7e06d9e85b819f90c764d4..c31f458182bb75533f785585418d9b0f83c13001 100644 (file)
@@ -26,6 +26,8 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class);
 
+    private PCEPPeerRegistry sessionRegistry = new PCEPPeerRegistry();
+
     /**
      * Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator.
      *
@@ -45,4 +47,7 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements
         return new PCEPSessionNegotiator(channel, promise, factory, this);
     }
 
+    public PCEPPeerRegistry getSessionRegistry() {
+        return this.sessionRegistry;
+    }
 }
diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPPeerRegistry.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPPeerRegistry.java
new file mode 100644 (file)
index 0000000..6263936
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.pcep.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+@ThreadSafe
+final class PCEPPeerRegistry {
+
+    /**
+     * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse it.
+     */
+    private static final long ID_CACHE_SECONDS = 3 * 3600;
+
+    /**
+     * The total amount of time we should remember a peer having been present, unless some other pressure forces us to
+     * forget about it due to {@link PEER_CACHE_SIZE}.
+     */
+    private static final long PEER_CACHE_SECONDS = 24 * 3600;
+
+    /**
+     * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines how many
+     * peers we can see turn around.
+     */
+    private static final long PEER_CACHE_SIZE = 1024;
+
+    @GuardedBy("this")
+    private final Cache<ByteArrayWrapper, PeerRecord> formerClients = CacheBuilder.newBuilder().expireAfterAccess(PEER_CACHE_SECONDS,
+            TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
+
+    @GuardedBy("this")
+    private final Map<ByteArrayWrapper, SessionReference> sessions = new HashMap<>();
+
+    protected interface SessionReference extends AutoCloseable {
+        Short getSessionId();
+    }
+
+
+    protected synchronized Optional<SessionReference> getSessionReference(final byte[] clientAddress) {
+        final SessionReference sessionReference = this.sessions.get(new ByteArrayWrapper(clientAddress));
+        if (sessionReference != null) {
+            return Optional.of(sessionReference);
+        }
+        return Optional.absent();
+    }
+
+    protected synchronized Optional<SessionReference> removeSessionReference(final byte[] clientAddress) {
+        final SessionReference sessionReference = this.sessions.remove(new ByteArrayWrapper(clientAddress));
+        if (sessionReference != null) {
+            return Optional.of(sessionReference);
+        }
+        return Optional.absent();
+    }
+
+    protected synchronized void putSessionReference(final byte[] clientAddress, final SessionReference sessionReference) {
+        this.sessions.put(new ByteArrayWrapper(clientAddress), sessionReference);
+    }
+
+    protected synchronized Short nextSession(final byte[] clientAddress) throws ExecutionException {
+        final PeerRecord peer = this.formerClients.get(new ByteArrayWrapper(clientAddress), new Callable<PeerRecord>() {
+            @Override
+            public PeerRecord call() {
+                return new PeerRecord(ID_CACHE_SECONDS, null);
+            }
+        });
+
+        return peer.allocId();
+    }
+
+    protected synchronized void releaseSession(final byte[] clientAddress, final short sessionId) throws ExecutionException {
+        this.formerClients.get(new ByteArrayWrapper(clientAddress), new Callable<PeerRecord>() {
+            @Override
+            public PeerRecord call() {
+                return new PeerRecord(ID_CACHE_SECONDS, sessionId);
+            }
+        });
+    }
+
+    private static final class ByteArrayWrapper {
+
+        private final byte[] byteArray;
+
+        public ByteArrayWrapper(final byte[] byteArray) {
+            this.byteArray = byteArray;
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(byteArray);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof ByteArrayWrapper)) {
+                return false;
+            }
+            return Arrays.equals(byteArray, ((ByteArrayWrapper) obj).byteArray);
+        }
+    }
+}
index 5fcb0a1171658bcad9647faaa9fb544cc9ca22af..610b545987a060787336a8b4659cb5c0279eea51 100644 (file)
@@ -7,10 +7,7 @@
  */
 package org.opendaylight.protocol.pcep.impl;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
+import com.google.common.base.Optional;
 import com.google.common.primitives.UnsignedBytes;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -18,13 +15,11 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.util.Comparator;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
+import org.opendaylight.protocol.pcep.impl.PCEPPeerRegistry.SessionReference;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,27 +30,6 @@ public class PCEPSessionNegotiator extends AbstractSessionNegotiator<Message, PC
 
     private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
 
-    /**
-     * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse it.
-     */
-    private static final long ID_CACHE_SECONDS = 3 * 3600;
-
-    /**
-     * The total amount of time we should remember a peer having been present, unless some other pressure forces us to
-     * forget about it due to {@link PEER_CACHE_SIZE}.
-     */
-    private static final long PEER_CACHE_SECONDS = 24 * 3600;
-
-    /**
-     * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines how many
-     * peers we can see turn around.
-     */
-    private static final long PEER_CACHE_SIZE = 1024;
-
-    @GuardedBy("this")
-    private final Cache<byte[], PeerRecord> formerClients = CacheBuilder.newBuilder().expireAfterAccess(PEER_CACHE_SECONDS,
-            TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
-
     private final Channel channel;
 
     private final Promise<PCEPSessionImpl> promise;
@@ -64,13 +38,6 @@ public class PCEPSessionNegotiator extends AbstractSessionNegotiator<Message, PC
 
     private final AbstractPCEPSessionNegotiatorFactory negFactory;
 
-    @GuardedBy("this")
-    private final BiMap<byte[], SessionReference> sessions = HashBiMap.create();
-
-    private interface SessionReference extends AutoCloseable {
-        Short getSessionId();
-    }
-
     public PCEPSessionNegotiator(final Channel channel, final Promise<PCEPSessionImpl> promise, final SessionListenerFactory<PCEPSessionListener> factory,
         final AbstractPCEPSessionNegotiatorFactory negFactory) {
         super(promise, channel);
@@ -91,14 +58,17 @@ public class PCEPSessionNegotiator extends AbstractSessionNegotiator<Message, PC
          * registered for this client.
          */
         final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress();
+        final PCEPPeerRegistry sessionReg = this.negFactory.getSessionRegistry();
 
         synchronized (lock) {
-            if (this.sessions.containsKey(clientAddress)) {
+            if (sessionReg.getSessionReference(clientAddress).isPresent()) {
                 final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress();
                 if (COMPARATOR.compare(serverAddress, clientAddress) > 0) {
-                    final SessionReference n = this.sessions.remove(clientAddress);
+                    final Optional<SessionReference> sessionRefMaybe = sessionReg.removeSessionReference(clientAddress);
                     try {
-                        n.close();
+                        if (sessionRefMaybe.isPresent()) {
+                            sessionRefMaybe.get().close();
+                        }
                     } catch (final Exception e) {
                         LOG.error("Unexpected failure to close old session", e);
                     }
@@ -109,19 +79,14 @@ public class PCEPSessionNegotiator extends AbstractSessionNegotiator<Message, PC
                 }
             }
 
-            final Short sessionId = nextSession(clientAddress);
+            final Short sessionId = sessionReg.nextSession(clientAddress);
             final AbstractPCEPSessionNegotiator n = this.negFactory.createNegotiator(this.promise, this.factory.getSessionListener(), this.channel, sessionId);
 
-            this.sessions.put(clientAddress, new SessionReference() {
+            sessionReg.putSessionReference(clientAddress, new SessionReference() {
                 @Override
                 public void close() throws ExecutionException {
                     try {
-                        PCEPSessionNegotiator.this.formerClients.get(clientAddress, new Callable<PeerRecord>() {
-                            @Override
-                            public PeerRecord call() {
-                                return new PeerRecord(ID_CACHE_SECONDS, getSessionId());
-                            }
-                        });
+                        sessionReg.releaseSession(clientAddress, sessionId);
                     } finally {
                         PCEPSessionNegotiator.this.channel.close();
                     }
@@ -137,7 +102,7 @@ public class PCEPSessionNegotiator extends AbstractSessionNegotiator<Message, PC
                 @Override
                 public void operationComplete(final ChannelFuture future) {
                     synchronized (lock) {
-                        PCEPSessionNegotiator.this.sessions.inverse().remove(this);
+                        sessionReg.removeSessionReference(clientAddress);
                     }
                 }
             });
@@ -148,18 +113,6 @@ public class PCEPSessionNegotiator extends AbstractSessionNegotiator<Message, PC
         }
     }
 
-    @GuardedBy("this")
-    protected Short nextSession(final byte[] clientAddress) throws ExecutionException {
-        final PeerRecord peer = this.formerClients.get(clientAddress, new Callable<PeerRecord>() {
-            @Override
-            public PeerRecord call() {
-                return new PeerRecord(ID_CACHE_SECONDS, null);
-            }
-        });
-
-        return peer.allocId();
-    }
-
     @Override
     protected void handleMessage(final Message msg) {
         throw new IllegalStateException("Bootstrap negotiator should have been replaced");
index f513a2a29818bdb3ce31c84378ac91a4f46d9667..6c4d4e8c2acd8cafcc7389ab576262847171ebfe 100644 (file)
@@ -53,14 +53,14 @@ public class PCEPDispatcherImplTest {
     public void setUp() {
         final Open open = new OpenBuilder().setSessionId((short) 0).setDeadTimer(DEAD_TIMER).setKeepalive(KEEP_ALIVE)
                 .build();
-        final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(
-                open, 0);
         final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         final MessageRegistry msgReg = ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
                 .getMessageHandlerRegistry();
-        this.dispatcher = new PCEPDispatcherImpl(msgReg, snf, eventLoopGroup, eventLoopGroup);
-        this.pccMock = new PCCMock<>(snf, new PCEPHandlerFactory(msgReg), new DefaultPromise<PCEPSessionImpl>(
-                GlobalEventExecutor.INSTANCE));
+        this.dispatcher = new PCEPDispatcherImpl(msgReg, new DefaultPCEPSessionNegotiatorFactory(open, 0),
+                eventLoopGroup, eventLoopGroup);
+        this.pccMock = new PCCMock<>(new DefaultPCEPSessionNegotiatorFactory(open, 0),
+                new PCEPHandlerFactory(msgReg), new DefaultPromise<PCEPSessionImpl>(
+                        GlobalEventExecutor.INSTANCE));
     }
 
     @Test
@@ -72,7 +72,7 @@ public class PCEPDispatcherImplTest {
                         return new SimpleSessionListener();
                     }
                 });
-        final PCEPSessionImpl session1 = pccMock.createClient(CLIENT1_ADDRESS,
+        final PCEPSessionImpl session1 = this.pccMock.createClient(CLIENT1_ADDRESS,
                 new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
                 new SessionListenerFactory<PCEPSessionListener>() {
                     @Override
@@ -81,7 +81,7 @@ public class PCEPDispatcherImplTest {
                     }
                 }).get();
 
-        final PCEPSessionImpl session2 = pccMock.createClient(CLIENT2_ADDRESS,
+        final PCEPSessionImpl session2 = this.pccMock.createClient(CLIENT2_ADDRESS,
                 new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
                 new SessionListenerFactory<PCEPSessionListener>() {
                     @Override
@@ -102,8 +102,80 @@ public class PCEPDispatcherImplTest {
         session1.close();
         session2.close();
         Assert.assertTrue(futureChannel.channel().isActive());
+    }
+
+    @Test
+    public void testCreateDuplicateClient() throws InterruptedException, ExecutionException {
+        this.dispatcher.createServer(new InetSocketAddress("0.0.0.0", PORT),
+                new SessionListenerFactory<PCEPSessionListener>() {
+                    @Override
+                    public PCEPSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                });
+        final PCEPSessionImpl session1 = this.pccMock.createClient(CLIENT1_ADDRESS,
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+                new SessionListenerFactory<PCEPSessionListener>() {
+                    @Override
+                    public PCEPSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                }).get();
+
+        try {
+            this.pccMock.createClient(CLIENT1_ADDRESS,
+                    new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+                    new SessionListenerFactory<PCEPSessionListener>() {
+                        @Override
+                        public PCEPSessionListener getSessionListener() {
+                            return new SimpleSessionListener();
+                        }
+                    }).get();
+            Assert.fail();
+        } catch(ExecutionException e) {
+            Assert.assertTrue(e.getMessage().contains("A conflicting session for address"));
+        } finally {
+            session1.close();
+        }
+    }
+
+    @Test
+    public void testReconectClient() throws InterruptedException, ExecutionException {
+        this.dispatcher.createServer(new InetSocketAddress("0.0.0.0", PORT),
+                new SessionListenerFactory<PCEPSessionListener>() {
+                    @Override
+                    public PCEPSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                });
+        final PCEPSessionImpl session1 = this.pccMock.createClient(CLIENT1_ADDRESS,
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+                new SessionListenerFactory<PCEPSessionListener>() {
+                    @Override
+                    public PCEPSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                }).get();
+
+        Assert.assertEquals(CLIENT1_ADDRESS.getAddress(), session1.getRemoteAddress());
+        Assert.assertEquals(DEAD_TIMER, session1.getDeadTimerValue().shortValue());
+        Assert.assertEquals(KEEP_ALIVE, session1.getKeepAliveTimerValue().shortValue());
+        session1.close();
 
-        futureChannel.channel().close();
+        final PCEPSessionImpl session2 = this.pccMock.createClient(CLIENT1_ADDRESS,
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 500),
+                new SessionListenerFactory<PCEPSessionListener>() {
+                    @Override
+                    public PCEPSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                }).get();
+
+        Assert.assertEquals(CLIENT1_ADDRESS.getAddress(), session1.getRemoteAddress());
+        Assert.assertEquals(DEAD_TIMER, session2.getDeadTimerValue().shortValue());
+        Assert.assertEquals(KEEP_ALIVE, session2.getKeepAliveTimerValue().shortValue());
+
+        session2.close();
     }
 
     @After
index 9bc8bc99a03a4e583a80ed6e68f072be4d7f41c4..8623214abb06d0d824624ea91be2d79db1215603 100644 (file)
@@ -86,19 +86,18 @@ public final class Main {
 
     public static void createPCCs(final int lspsPerPcc, final boolean pcerr, final int pccCount,
             final InetAddress localAddress, final List<InetAddress> remoteAddress, final short keepalive, final short deadtimer) throws InterruptedException, ExecutionException {
-        final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(
-                new OpenBuilder().setKeepalive(keepalive).setDeadTimer(deadtimer).setSessionId((short) 0).build(), 0);
-
         final StatefulActivator activator07 = new StatefulActivator();
         activator07.start(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance());
-        final PCCMock<Message, PCEPSessionImpl, PCEPSessionListener> pcc = new PCCMock<>(snf, new PCEPHandlerFactory(
-                ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry()));
-
         for (final InetAddress pceAddress : remoteAddress) {
             InetAddress currentAddress = localAddress;
             int i = 0;
             while (i < pccCount) {
                 final InetAddress pccAddress = currentAddress;
+                final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(
+                        new OpenBuilder().setKeepalive(keepalive).setDeadTimer(deadtimer).setSessionId((short) 0).build(), 0);
+
+                final PCCMock<Message, PCEPSessionImpl, PCEPSessionListener> pcc = new PCCMock<Message, PCEPSessionImpl, PCEPSessionListener>(snf, new PCEPHandlerFactory(
+                        ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry()));
                 pcc.createClient(new InetSocketAddress(pccAddress, 0), new InetSocketAddress(pceAddress, DEFAULT_PORT),
                         new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, RECONNECT_STRATEGY_TIMEOUT),
                         new SessionListenerFactory<PCEPSessionListener>() {
index 225b673e036195ca6269c62aef5aac8096636f95..401369e10a9050a00561787a3ff474f7a31a04bf 100644 (file)
@@ -42,7 +42,7 @@ public class PCCMockTest {
             org.opendaylight.protocol.pcep.testtool.Main.main(new String[]{"-a", "127.0.4.0:4189", "-ka", "10", "-d", "0", "--stateful", "--active"});
             org.opendaylight.protocol.pcep.testtool.Main.main(new String[]{"-a", "127.0.2.0:4189", "-ka", "10", "-d", "0", "--stateful", "--active"});
             org.opendaylight.protocol.pcep.testtool.Main.main(new String[]{"-a", "127.0.3.0:4189", "-ka", "10", "-d", "0", "--stateful", "--active"});
-            Main.main(new String[] {"--local-address", "127.0.0.1", "--remote-address", "127.0.4.0,127.0.2.0,127.0.3.0"});
+            Main.main(new String[] {"--local-address", "127.0.0.1", "--remote-address", "127.0.4.0,127.0.2.0,127.0.3.0", "--pcc", "3"});
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }