BUG-197: improve session ID tracking
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / AbstractPCEPSessionNegotiatorFactory.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import io.netty.channel.Channel;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelFutureListener;
13 import io.netty.util.concurrent.Promise;
14
15 import java.net.InetSocketAddress;
16 import java.util.Comparator;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeUnit;
20
21 import javax.annotation.concurrent.GuardedBy;
22
23 import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
24 import org.opendaylight.protocol.framework.SessionListenerFactory;
25 import org.opendaylight.protocol.framework.SessionNegotiator;
26 import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
27 import org.opendaylight.protocol.pcep.PCEPSessionListener;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.google.common.cache.Cache;
33 import com.google.common.cache.CacheBuilder;
34 import com.google.common.collect.BiMap;
35 import com.google.common.collect.HashBiMap;
36 import com.google.common.primitives.UnsignedBytes;
37
38 /**
39  * SessionNegotiator which takes care of making sure sessions between PCEP peers are kept unique. This needs to be
40  * further subclassed to provide either a client or server factory.
41  */
42 public abstract class AbstractPCEPSessionNegotiatorFactory implements
43 SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> {
44         private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
45         private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class);
46
47         /**
48          * The total amount of time we should remember a peer having been present, unless some other pressure
49          * forces us to forget about it due to {@link PEER_CACHE_SIZE}.
50          */
51         private static final long PEER_CACHE_SECONDS = 24 * 3600;
52
53         /**
54          * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines
55          * how many peers we can see turn around.
56          */
57         private static final long PEER_CACHE_SIZE = 1024;
58
59         /**
60          * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse
61          * it.
62          */
63         private static final long ID_CACHE_SECONDS = 3*3600;
64
65         @GuardedBy("this")
66         private final BiMap<byte[], SessionReference> sessions = HashBiMap.create();
67
68         @GuardedBy("this")
69         private final Cache<byte[], PeerRecord> formerClients = CacheBuilder.newBuilder().
70         expireAfterAccess(PEER_CACHE_SECONDS, TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build();
71
72         private interface SessionReference extends AutoCloseable {
73                 Short getSessionId();
74         }
75
76         /**
77          * Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator.
78          * 
79          * @param promise Session promise to be completed by the negotiator
80          * @param channel Associated channel
81          * @param sessionId Session ID assigned to the resulting session
82          * @return a PCEP session negotiator
83          */
84         protected abstract AbstractPCEPSessionNegotiator createNegotiator(Promise<PCEPSessionImpl> promise, PCEPSessionListener listener,
85                         Channel channel, short sessionId);
86
87         @Override
88         public final SessionNegotiator<PCEPSessionImpl> getSessionNegotiator(final SessionListenerFactory<PCEPSessionListener> factory,
89                         final Channel channel, final Promise<PCEPSessionImpl> promise) {
90
91                 final Object lock = this;
92
93                 LOG.debug("Instantiating bootstrap negotiator for channel {}", channel);
94                 return new AbstractSessionNegotiator<Message, PCEPSessionImpl>(promise, channel) {
95                         @Override
96                         protected void startNegotiation() throws ExecutionException {
97                                 LOG.debug("Bootstrap negotiation for channel {} started", this.channel);
98
99                                 /*
100                                  * We have a chance to see if there's a client session already
101                                  * registered for this client.
102                                  */
103                                 final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress();
104
105                                 synchronized (lock) {
106                                         if (AbstractPCEPSessionNegotiatorFactory.this.sessions.containsKey(clientAddress)) {
107                                                 final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress();
108                                                 if (COMPARATOR.compare(serverAddress, clientAddress) > 0) {
109                                                         final SessionReference n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress);
110                                                         try {
111                                                                 n.close();
112                                                         } catch (final Exception e) {
113                                                                 LOG.error("Unexpected failure to close old session", e);
114                                                         }
115                                                 } else {
116                                                         negotiationFailed(new RuntimeException("A conflicting session for address "
117                                                                         + ((InetSocketAddress) this.channel.remoteAddress()).getAddress() + " found."));
118                                                         return;
119                                                 }
120                                         }
121
122                                         final Short sessionId = nextSession(clientAddress);
123                                         final AbstractPCEPSessionNegotiator n = createNegotiator(promise, factory.getSessionListener(), this.channel, sessionId);
124
125                                         AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new SessionReference() {
126                                                 @Override
127                                                 public void close() throws ExecutionException {
128                                                         try {
129                                                                 formerClients.get(clientAddress, new Callable<PeerRecord>() {
130                                                                         @Override
131                                                                         public PeerRecord call() {
132                                                                                 return new PeerRecord(ID_CACHE_SECONDS, getSessionId());
133                                                                         }
134                                                                 });
135                                                         } finally {
136                                                                 channel.close();
137                                                         }
138                                                 }
139
140                                                 @Override
141                                                 public Short getSessionId() {
142                                                         return sessionId;
143                                                 }
144                                         });
145
146                                         this.channel.closeFuture().addListener(new ChannelFutureListener() {
147                                                 @Override
148                                                 public void operationComplete(final ChannelFuture future) throws Exception {
149                                                         synchronized (lock) {
150                                                                 AbstractPCEPSessionNegotiatorFactory.this.sessions.inverse().remove(this);
151                                                         }
152                                                 }
153                                         });
154
155                                         LOG.info("Replacing bootstrap negotiator for channel {}", this.channel);
156                                         this.channel.pipeline().replace(this, "negotiator", n);
157                                         n.startNegotiation();
158                                 }
159                         }
160
161                         @Override
162                         protected void handleMessage(final Message msg) throws Exception {
163                                 throw new IllegalStateException("Bootstrap negotiator should have been replaced");
164                         }
165                 };
166         }
167
168         @GuardedBy("this")
169         private Short nextSession(final byte[] clientAddress) throws ExecutionException {
170                 final PeerRecord peer = formerClients.get(clientAddress, new Callable<PeerRecord>() {
171                         @Override
172                         public PeerRecord call() throws Exception {
173                                 return new PeerRecord(ID_CACHE_SECONDS, null);
174                         }
175                 });
176
177                 return peer.allocId();
178         }
179 }