BUG-58: refactor to take advantage of netty
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / AbstractPCEPSessionNegotiatorFactory.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import io.netty.channel.Channel;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelFutureListener;
13 import io.netty.util.concurrent.Promise;
14
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.util.Comparator;
19 import java.util.Map;
20 import java.util.WeakHashMap;
21
22 import javax.annotation.concurrent.GuardedBy;
23
24 import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
25 import org.opendaylight.protocol.framework.SessionListenerFactory;
26 import org.opendaylight.protocol.framework.SessionNegotiator;
27 import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
28 import org.opendaylight.protocol.pcep.PCEPMessage;
29 import org.opendaylight.protocol.pcep.PCEPSessionListener;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.google.common.collect.BiMap;
34 import com.google.common.collect.HashBiMap;
35 import com.google.common.primitives.UnsignedBytes;
36
37 /**
38  * SessionNegotiator which takes care of making sure sessions between PCEP
39  * peers are kept unique. This needs to be further subclassed to provide
40  * either a client or server factory.
41  */
42 public abstract class AbstractPCEPSessionNegotiatorFactory implements SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> {
43         private static final Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();
44         private static final Logger logger = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class);
45         private final BiMap<byte[], Closeable> sessions = HashBiMap.create();
46         private final Map<byte[], Short> sessionIds = new WeakHashMap<>();
47
48         /**
49          * Create a new negotiator. This method needs to be implemented by
50          * subclasses to actually provide a negotiator.
51          * 
52          * @param promise Session promise to be completed by the negotiator
53          * @param channel Associated channel
54          * @param sessionId Session ID assigned to the resulting session
55          * @return a PCEP session negotiator
56          */
57         protected abstract AbstractPCEPSessionNegotiator createNegotiator(Promise<PCEPSessionImpl> promise, PCEPSessionListener listener,
58                         Channel channel, short sessionId);
59
60         @Override
61         public final SessionNegotiator<PCEPSessionImpl> getSessionNegotiator(final SessionListenerFactory<PCEPSessionListener> factory,
62                         final Channel channel, final Promise<PCEPSessionImpl> promise) {
63
64                 final Object lock = this;
65
66                 logger.debug("Instantiating bootstrap negotiator for channel {}", channel);
67                 return new AbstractSessionNegotiator<PCEPMessage, PCEPSessionImpl>(promise, channel) {
68                         @Override
69                         protected void startNegotiation() throws Exception {
70                                 logger.debug("Bootstrap negotiation for channel {} started", channel);
71
72                                 /*
73                                  * We have a chance to see if there's a client session already
74                                  * registered for this client.
75                                  */
76                                 final byte[] clientAddress = ((InetSocketAddress) channel.remoteAddress()).getAddress().getAddress();
77
78                                 synchronized (lock) {
79
80                                         if (sessions.containsKey(clientAddress)) {
81                                                 // FIXME: cross-reference this to RFC5440
82
83                                                 final byte[] serverAddress = ((InetSocketAddress) channel.localAddress()).getAddress().getAddress();
84                                                 if (comparator.compare(serverAddress, clientAddress) > 0) {
85                                                         final Closeable n = sessions.remove(clientAddress);
86                                                         try {
87                                                                 n.close();
88                                                         } catch (IOException e) {
89                                                                 logger.warn("Unexpected failure to close old session", e);
90                                                         }
91                                                 } else {
92                                                         negotiationFailed(new RuntimeException("A conflicting session for address " +
93                                                                         ((InetSocketAddress) channel.remoteAddress()).getAddress() + " found."));
94                                                         return;
95                                                 }
96                                         }
97
98                                         final short sessionId = nextSession(clientAddress);
99                                         final AbstractPCEPSessionNegotiator n = createNegotiator(promise, factory.getSessionListener(), channel, sessionId);
100
101                                         sessions.put(clientAddress, new Closeable() {
102                                                 @Override
103                                                 public void close() {
104                                                         channel.close();
105                                                 }});
106
107                                         channel.closeFuture().addListener(new ChannelFutureListener() {
108                                                 @Override
109                                                 public void operationComplete(final ChannelFuture future) throws Exception {
110                                                         synchronized (lock) {
111                                                                 sessions.inverse().remove(this);
112                                                         }
113                                                 }
114                                         });
115
116                                         logger.debug("Replacing bootstrap negotiator for channel {}", channel);
117                                         channel.pipeline().replace(this, "negotiator", n);
118                                         n.startNegotiation();
119                                 }
120                         }
121
122                         @Override
123                         protected void handleMessage(final PCEPMessage msg) throws Exception {
124                                 throw new IllegalStateException("Bootstrap negotiator should have been replaced");
125                         }
126                 };
127         }
128
129         @GuardedBy("this")
130         private short nextSession(final byte[] clientAddress) {
131                 /*
132                  * FIXME: Improve the allocation algorithm to make sure:
133                  * - no duplicate IDs are assigned
134                  * - we retain former session IDs for a reasonable time
135                  */
136                 Short next = sessionIds.get(clientAddress);
137                 if (next == null) {
138                         next = 0;
139                 }
140
141                 sessionIds.put(clientAddress, (short)((next + 1) % 255));
142                 return next;
143         }
144 }