Merge "Use ClassToInstanceMap instead of a HashMap"
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.net.InetSocketAddress;
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
18 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
19 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
20 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
35 import org.opendaylight.yangtools.yang.binding.DataObject;
36 import org.opendaylight.yangtools.yang.binding.Notification;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Handles messages (notifications + rpcs) and connections.
42  * @author mirehak
43  * @author michal.polkorab
44  */
45 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
46
47     private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
48
49     private ConnectionReadyListener connectionReadyListener;
50     private OpenflowProtocolListener messageListener;
51     private SystemNotificationsListener systemListener;
52     private AlienMessageListener alienMessageListener;
53     private AbstractOutboundQueueManager<?, ?> outputManager;
54     private OFVersionDetector versionDetector;
55
56     private final boolean useBarrier;
57
58     /**
59      * Default constructor.
60      * @param channel the channel to be set - used for communication
61      * @param address client address (used only in case of UDP communication,
62      *                as there is no need to store address over tcp (stable channel))
63      * @param useBarrier value is configurable by configSubsytem
64      */
65     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier,
66                                  final int channelOutboundQueueSize) {
67         super(channel, address, channelOutboundQueueSize);
68         this.useBarrier = useBarrier;
69         LOG.debug("ConnectionAdapter created");
70     }
71
72     @Override
73     public void setMessageListener(final OpenflowProtocolListener messageListener) {
74         this.messageListener = messageListener;
75     }
76
77     @Override
78     public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
79         this.connectionReadyListener = connectionReadyListener;
80     }
81
82     @Override
83     public void setSystemListener(final SystemNotificationsListener systemListener) {
84         this.systemListener = systemListener;
85     }
86
87     @Override
88     public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
89         this.alienMessageListener = alienMessageListener;
90     }
91
92     @Override
93     public void consumeDeviceMessage(final DataObject message) {
94         LOG.debug("ConsumeIntern msg on {}", channel);
95         if (disconnectOccured) {
96             return;
97         }
98         if (message instanceof Notification) {
99
100             // System events
101             if (message instanceof DisconnectEvent) {
102                 systemListener.onDisconnectEvent((DisconnectEvent) message);
103                 responseCache.invalidateAll();
104                 disconnectOccured = true;
105             } else if (message instanceof SwitchIdleEvent) {
106                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
107             // OpenFlow messages
108             } else if (message instanceof EchoRequestMessage) {
109                 if (outputManager != null) {
110                     outputManager.onEchoRequest((EchoRequestMessage) message);
111                 } else {
112                     messageListener.onEchoRequestMessage((EchoRequestMessage) message);
113                 }
114             } else if (message instanceof ErrorMessage) {
115                 // Send only unmatched errors
116                 if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
117                     messageListener.onErrorMessage((ErrorMessage) message);
118                 }
119             } else if (message instanceof ExperimenterMessage) {
120                 if (outputManager != null) {
121                     outputManager.onMessage((OfHeader) message);
122                 }
123                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
124             } else if (message instanceof FlowRemovedMessage) {
125                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
126             } else if (message instanceof HelloMessage) {
127                 LOG.info("Hello received");
128                 messageListener.onHelloMessage((HelloMessage) message);
129             } else if (message instanceof MultipartReplyMessage) {
130                 if (outputManager != null) {
131                     outputManager.onMessage((OfHeader) message);
132                 }
133                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
134             } else if (message instanceof PacketInMessage) {
135                 messageListener.onPacketInMessage((PacketInMessage) message);
136             } else if (message instanceof PortStatusMessage) {
137                 messageListener.onPortStatusMessage((PortStatusMessage) message);
138             } else {
139                 LOG.warn("message listening not supported for type: {}", message.getClass());
140             }
141         } else if (message instanceof OfHeader) {
142             LOG.debug("OF header msg received");
143
144             if (alienMessageListener != null && alienMessageListener.onAlienMessage((OfHeader) message)) {
145                 LOG.debug("Alien message {} received", message.implementedInterface());
146             } else if (outputManager == null || !outputManager.onMessage((OfHeader) message)
147                     || message instanceof EchoOutput) {
148                 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
149                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
150                 if (listener != null) {
151                     LOG.debug("Corresponding rpcFuture found");
152                     listener.completed((OfHeader) message);
153                     LOG.debug("After setting rpcFuture");
154                     responseCache.invalidate(key);
155                 }
156             }
157         } else {
158             LOG.warn("message listening not supported for type: {}", message.getClass());
159         }
160     }
161
162     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
163         return new RpcResponseKey(message.getXid(), message.implementedInterface().getName());
164     }
165
166     @Override
167     public void checkListeners() {
168         final StringBuilder buffer =  new StringBuilder();
169         if (systemListener == null) {
170             buffer.append("SystemListener ");
171         }
172         if (messageListener == null) {
173             buffer.append("MessageListener ");
174         }
175         if (connectionReadyListener == null) {
176             buffer.append("ConnectionReadyListener ");
177         }
178
179         Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
180     }
181
182     @Override
183     public void fireConnectionReadyNotification() {
184         versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
185         Preconditions.checkState(versionDetector != null);
186
187         new Thread(() -> connectionReadyListener.onConnectionReady()).start();
188     }
189
190     @Override
191     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
192             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
193         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
194
195         final AbstractOutboundQueueManager<T, ?> ret;
196         if (useBarrier) {
197             ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
198         } else {
199             LOG.warn("OutboundQueueManager without barrier is started.");
200             ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
201         }
202
203         outputManager = ret;
204         /* we don't need it anymore */
205         channel.pipeline().remove(output);
206         // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
207         // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
208         // cause problems because we are shutting down the queue before Openflowplugin knows about it.
209         channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
210                 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
211
212         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
213             @Override
214             protected void removeRegistration() {
215                 outputManager.close();
216                 channel.pipeline().remove(outputManager);
217                 outputManager = null;
218             }
219         };
220     }
221
222     Channel getChannel() {
223         return channel;
224     }
225
226     @Override
227     public void setPacketInFiltering(final boolean enabled) {
228         versionDetector.setFilterPacketIns(enabled);
229         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
230     }
231 }