Allow any hello mesage and extend hello support for v1.4, v1.5
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.net.InetSocketAddress;
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
18 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
19 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.Notification;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Handles messages (notifications + rpcs) and connections.
40  * @author mirehak
41  * @author michal.polkorab
42  */
43 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
44
45     private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
46
47     private ConnectionReadyListener connectionReadyListener;
48     private OpenflowProtocolListener messageListener;
49     private SystemNotificationsListener systemListener;
50     private AbstractOutboundQueueManager<?, ?> outputManager;
51     private OFVersionDetector versionDetector;
52
53     private final boolean useBarrier;
54
55     /**
56      * Default constructor.
57      * @param channel the channel to be set - used for communication
58      * @param address client address (used only in case of UDP communication,
59      *                as there is no need to store address over tcp (stable channel))
60      * @param useBarrier value is configurable by configSubsytem
61      */
62     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
63         super(channel, address);
64         this.useBarrier = useBarrier;
65         LOG.debug("ConnectionAdapter created");
66     }
67
68     @Override
69     public void setMessageListener(final OpenflowProtocolListener messageListener) {
70         this.messageListener = messageListener;
71     }
72
73     @Override
74     public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
75         this.connectionReadyListener = connectionReadyListener;
76     }
77
78     @Override
79     public void setSystemListener(final SystemNotificationsListener systemListener) {
80         this.systemListener = systemListener;
81     }
82
83     @Override
84     public void consumeDeviceMessage(final DataObject message) {
85         LOG.debug("ConsumeIntern msg on {}", channel);
86         if (disconnectOccured) {
87             return;
88         }
89         if (message instanceof Notification) {
90
91             // System events
92             if (message instanceof DisconnectEvent) {
93                 systemListener.onDisconnectEvent((DisconnectEvent) message);
94                 responseCache.invalidateAll();
95                 disconnectOccured = true;
96             } else if (message instanceof SwitchIdleEvent) {
97                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
98             // OpenFlow messages
99             } else if (message instanceof EchoRequestMessage) {
100                 if (outputManager != null) {
101                     outputManager.onEchoRequest((EchoRequestMessage) message);
102                 } else {
103                     messageListener.onEchoRequestMessage((EchoRequestMessage) message);
104                 }
105             } else if (message instanceof ErrorMessage) {
106                 // Send only unmatched errors
107                 if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
108                     messageListener.onErrorMessage((ErrorMessage) message);
109                 }
110             } else if (message instanceof ExperimenterMessage) {
111                 if (outputManager != null) {
112                     outputManager.onMessage((OfHeader) message);
113                 }
114                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
115             } else if (message instanceof FlowRemovedMessage) {
116                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
117             } else if (message instanceof HelloMessage) {
118                 LOG.info("Hello received");
119                 messageListener.onHelloMessage((HelloMessage) message);
120             } else if (message instanceof MultipartReplyMessage) {
121                 if (outputManager != null) {
122                     outputManager.onMessage((OfHeader) message);
123                 }
124                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
125             } else if (message instanceof PacketInMessage) {
126                 messageListener.onPacketInMessage((PacketInMessage) message);
127             } else if (message instanceof PortStatusMessage) {
128                 messageListener.onPortStatusMessage((PortStatusMessage) message);
129             } else {
130                 LOG.warn("message listening not supported for type: {}", message.getClass());
131             }
132         } else if (message instanceof OfHeader) {
133             LOG.debug("OF header msg received");
134
135             if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
136                 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
137                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
138                 if (listener != null) {
139                     LOG.debug("Corresponding rpcFuture found");
140                     listener.completed((OfHeader)message);
141                     LOG.debug("After setting rpcFuture");
142                     responseCache.invalidate(key);
143                 } else {
144                     LOG.warn("received unexpected rpc response: {}", key);
145                 }
146             }
147         } else {
148             LOG.warn("message listening not supported for type: {}", message.getClass());
149         }
150     }
151
152     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
153         return new RpcResponseKey(message.getXid(), message.getImplementedInterface().getName());
154     }
155
156     @Override
157     public void checkListeners() {
158         final StringBuilder buffer =  new StringBuilder();
159         if (systemListener == null) {
160             buffer.append("SystemListener ");
161         }
162         if (messageListener == null) {
163             buffer.append("MessageListener ");
164         }
165         if (connectionReadyListener == null) {
166             buffer.append("ConnectionReadyListener ");
167         }
168
169         Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
170     }
171
172     @Override
173     public void fireConnectionReadyNotification() {
174         versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
175         Preconditions.checkState(versionDetector != null);
176
177         new Thread(new Runnable() {
178             @Override
179             public void run() {
180                 connectionReadyListener.onConnectionReady();
181             }
182         }).start();
183     }
184
185     @Override
186     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
187             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
188         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
189
190         final AbstractOutboundQueueManager<T, ?> ret;
191         if (useBarrier) {
192             ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
193         } else {
194             LOG.warn("OutboundQueueManager without barrier is started.");
195             ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
196         }
197
198         outputManager = ret;
199         /* we don't need it anymore */
200         channel.pipeline().remove(output);
201         // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
202         // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
203         // cause problems because we are shutting down the queue before Openflowplugin knows about it.
204         channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
205                 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
206
207         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
208             @Override
209             protected void removeRegistration() {
210                 outputManager.close();
211                 channel.pipeline().remove(outputManager);
212                 outputManager = null;
213             }
214         };
215     }
216
217     Channel getChannel() {
218         return channel;
219     }
220
221     @Override
222     public void setPacketInFiltering(final boolean enabled) {
223         versionDetector.setFilterPacketIns(enabled);
224         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
225     }
226 }