Add method to register listener for unknown msg
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.net.InetSocketAddress;
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
18 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
19 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
20 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
34 import org.opendaylight.yangtools.yang.binding.DataObject;
35 import org.opendaylight.yangtools.yang.binding.Notification;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Handles messages (notifications + rpcs) and connections.
41  * @author mirehak
42  * @author michal.polkorab
43  */
44 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
45
46     private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
47
48     private ConnectionReadyListener connectionReadyListener;
49     private OpenflowProtocolListener messageListener;
50     private SystemNotificationsListener systemListener;
51     private AlienMessageListener alienMessageListener;
52     private AbstractOutboundQueueManager<?, ?> outputManager;
53     private OFVersionDetector versionDetector;
54
55     private final boolean useBarrier;
56
57     /**
58      * Default constructor.
59      * @param channel the channel to be set - used for communication
60      * @param address client address (used only in case of UDP communication,
61      *                as there is no need to store address over tcp (stable channel))
62      * @param useBarrier value is configurable by configSubsytem
63      */
64     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
65         super(channel, address);
66         this.useBarrier = useBarrier;
67         LOG.debug("ConnectionAdapter created");
68     }
69
70     @Override
71     public void setMessageListener(final OpenflowProtocolListener messageListener) {
72         this.messageListener = messageListener;
73     }
74
75     @Override
76     public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
77         this.connectionReadyListener = connectionReadyListener;
78     }
79
80     @Override
81     public void setSystemListener(final SystemNotificationsListener systemListener) {
82         this.systemListener = systemListener;
83     }
84
85     @Override
86     public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
87         this.alienMessageListener = alienMessageListener;
88     }
89
90     @Override
91     public void consumeDeviceMessage(final DataObject message) {
92         LOG.debug("ConsumeIntern msg on {}", channel);
93         if (disconnectOccured) {
94             return;
95         }
96         if (message instanceof Notification) {
97
98             // System events
99             if (message instanceof DisconnectEvent) {
100                 systemListener.onDisconnectEvent((DisconnectEvent) message);
101                 responseCache.invalidateAll();
102                 disconnectOccured = true;
103             } else if (message instanceof SwitchIdleEvent) {
104                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
105             // OpenFlow messages
106             } else if (message instanceof EchoRequestMessage) {
107                 if (outputManager != null) {
108                     outputManager.onEchoRequest((EchoRequestMessage) message);
109                 } else {
110                     messageListener.onEchoRequestMessage((EchoRequestMessage) message);
111                 }
112             } else if (message instanceof ErrorMessage) {
113                 // Send only unmatched errors
114                 if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
115                     messageListener.onErrorMessage((ErrorMessage) message);
116                 }
117             } else if (message instanceof ExperimenterMessage) {
118                 if (outputManager != null) {
119                     outputManager.onMessage((OfHeader) message);
120                 }
121                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
122             } else if (message instanceof FlowRemovedMessage) {
123                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
124             } else if (message instanceof HelloMessage) {
125                 LOG.info("Hello received");
126                 messageListener.onHelloMessage((HelloMessage) message);
127             } else if (message instanceof MultipartReplyMessage) {
128                 if (outputManager != null) {
129                     outputManager.onMessage((OfHeader) message);
130                 }
131                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
132             } else if (message instanceof PacketInMessage) {
133                 messageListener.onPacketInMessage((PacketInMessage) message);
134             } else if (message instanceof PortStatusMessage) {
135                 messageListener.onPortStatusMessage((PortStatusMessage) message);
136             } else {
137                 LOG.warn("message listening not supported for type: {}", message.getClass());
138             }
139         } else if (message instanceof OfHeader) {
140             LOG.debug("OF header msg received");
141             boolean found = false;
142
143             if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
144                 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
145                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
146                 if (listener != null) {
147                     found = true;
148                     LOG.debug("Corresponding rpcFuture found");
149                     listener.completed((OfHeader) message);
150                     LOG.debug("After setting rpcFuture");
151                     responseCache.invalidate(key);
152                 }
153             }
154
155             if (!found && alienMessageListener != null) {
156                 LOG.debug("Alien message {} received", message.getImplementedInterface());
157                 alienMessageListener.onAlienMessage((OfHeader) message);
158             }
159         } else {
160             LOG.warn("message listening not supported for type: {}", message.getClass());
161         }
162     }
163
164     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
165         return new RpcResponseKey(message.getXid(), message.getImplementedInterface().getName());
166     }
167
168     @Override
169     public void checkListeners() {
170         final StringBuilder buffer =  new StringBuilder();
171         if (systemListener == null) {
172             buffer.append("SystemListener ");
173         }
174         if (messageListener == null) {
175             buffer.append("MessageListener ");
176         }
177         if (connectionReadyListener == null) {
178             buffer.append("ConnectionReadyListener ");
179         }
180
181         Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
182     }
183
184     @Override
185     public void fireConnectionReadyNotification() {
186         versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
187         Preconditions.checkState(versionDetector != null);
188
189         new Thread(new Runnable() {
190             @Override
191             public void run() {
192                 connectionReadyListener.onConnectionReady();
193             }
194         }).start();
195     }
196
197     @Override
198     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
199             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
200         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
201
202         final AbstractOutboundQueueManager<T, ?> ret;
203         if (useBarrier) {
204             ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
205         } else {
206             LOG.warn("OutboundQueueManager without barrier is started.");
207             ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
208         }
209
210         outputManager = ret;
211         /* we don't need it anymore */
212         channel.pipeline().remove(output);
213         // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
214         // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
215         // cause problems because we are shutting down the queue before Openflowplugin knows about it.
216         channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
217                 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
218
219         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
220             @Override
221             protected void removeRegistration() {
222                 outputManager.close();
223                 channel.pipeline().remove(outputManager);
224                 outputManager = null;
225             }
226         };
227     }
228
229     Channel getChannel() {
230         return channel;
231     }
232
233     @Override
234     public void setPacketInFiltering(final boolean enabled) {
235         versionDetector.setFilterPacketIns(enabled);
236         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
237     }
238 }