Bug 5118 - Unsent messages reported as failed after disconnect
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.net.InetSocketAddress;
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
18 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
19 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.Notification;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Handles messages (notifications + rpcs) and connections
40  * @author mirehak
41  * @author michal.polkorab
42  */
43 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
44
45     private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
46
47     private ConnectionReadyListener connectionReadyListener;
48     private OpenflowProtocolListener messageListener;
49     private SystemNotificationsListener systemListener;
50     private AbstractOutboundQueueManager<?, ?> outputManager;
51     private OFVersionDetector versionDetector;
52
53     private final boolean useBarrier;
54
55     /**
56      * default ctor
57      *
58      * @param channel the channel to be set - used for communication
59      * @param address client address (used only in case of UDP communication,
60      *            as there is no need to store address over tcp (stable channel))
61      * @param useBarrier value is configurable by configSubsytem
62      */
63     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
64         super(channel, address);
65         this.useBarrier = useBarrier;
66         LOG.debug("ConnectionAdapter created");
67     }
68
69     @Override
70     public void setMessageListener(final OpenflowProtocolListener messageListener) {
71         this.messageListener = messageListener;
72     }
73
74     @Override
75     public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
76         this.connectionReadyListener = connectionReadyListener;
77     }
78
79     @Override
80     public void setSystemListener(final SystemNotificationsListener systemListener) {
81         this.systemListener = systemListener;
82     }
83
84     @Override
85     public void consumeDeviceMessage(final DataObject message) {
86         LOG.debug("ConsumeIntern msg on {}", channel);
87         if (disconnectOccured ) {
88             return;
89         }
90         if (message instanceof Notification) {
91
92             // System events
93             if (message instanceof DisconnectEvent) {
94                 systemListener.onDisconnectEvent((DisconnectEvent) message);
95                 responseCache.invalidateAll();
96                 disconnectOccured = true;
97             } else if (message instanceof SwitchIdleEvent) {
98                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
99                 // OpenFlow messages
100             } else if (message instanceof EchoRequestMessage) {
101                 if (outputManager != null) {
102                     outputManager.onEchoRequest((EchoRequestMessage) message);
103                 } else {
104                     messageListener.onEchoRequestMessage((EchoRequestMessage) message);
105                 }
106             } else if (message instanceof ErrorMessage) {
107                 // Send only unmatched errors
108                 if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
109                     messageListener.onErrorMessage((ErrorMessage) message);
110                 }
111             } else if (message instanceof ExperimenterMessage) {
112                 if (outputManager != null) {
113                     outputManager.onMessage((OfHeader) message);
114                 }
115                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
116             } else if (message instanceof FlowRemovedMessage) {
117                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
118             } else if (message instanceof HelloMessage) {
119                 LOG.info("Hello received / branch");
120                 messageListener.onHelloMessage((HelloMessage) message);
121             } else if (message instanceof MultipartReplyMessage) {
122                 if (outputManager != null) {
123                     outputManager.onMessage((OfHeader) message);
124                 }
125                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
126             } else if (message instanceof PacketInMessage) {
127                 messageListener.onPacketInMessage((PacketInMessage) message);
128             } else if (message instanceof PortStatusMessage) {
129                 messageListener.onPortStatusMessage((PortStatusMessage) message);
130             } else {
131                 LOG.warn("message listening not supported for type: {}", message.getClass());
132             }
133         } else if (message instanceof OfHeader) {
134             LOG.debug("OFheader msg received");
135
136             if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
137                 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
138                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
139                 if (listener != null) {
140                     LOG.debug("corresponding rpcFuture found");
141                     listener.completed((OfHeader)message);
142                     LOG.debug("after setting rpcFuture");
143                     responseCache.invalidate(key);
144                 } else {
145                     LOG.warn("received unexpected rpc response: {}", key);
146                 }
147             }
148         } else {
149             LOG.warn("message listening not supported for type: {}", message.getClass());
150         }
151     }
152
153     /**
154      * @param message
155      * @return
156      */
157     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
158         return new RpcResponseKey(message.getXid(), message.getImplementedInterface().getName());
159     }
160
161     @Override
162     public void checkListeners() {
163         final StringBuilder buffer =  new StringBuilder();
164         if (systemListener == null) {
165             buffer.append("SystemListener ");
166         }
167         if (messageListener == null) {
168             buffer.append("MessageListener ");
169         }
170         if (connectionReadyListener == null) {
171             buffer.append("ConnectionReadyListener ");
172         }
173
174         Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
175     }
176
177     @Override
178     public void fireConnectionReadyNotification() {
179         versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
180         Preconditions.checkState(versionDetector != null);
181
182         new Thread(new Runnable() {
183             @Override
184             public void run() {
185                 connectionReadyListener.onConnectionReady();
186             }
187         }).start();
188     }
189
190     @Override
191     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
192             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
193         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
194
195         final AbstractOutboundQueueManager<T, ?> ret;
196         if (useBarrier) {
197             ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
198         } else {
199             LOG.warn("OutboundQueueManager without barrier is started.");
200             ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
201         }
202
203         outputManager = ret;
204         /* we don't need it anymore */
205         channel.pipeline().remove(output);
206         // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
207         // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
208         // cause problems because we are shutting down the queue before Openflowplugin knows about it.
209         channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
210                 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
211
212         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
213             @Override
214             protected void removeRegistration() {
215                 outputManager.close();
216                 channel.pipeline().remove(outputManager);
217                 outputManager = null;
218             }
219         };
220     }
221
222     Channel getChannel() {
223         return channel;
224     }
225
226     @Override
227     public void setPacketInFiltering(final boolean enabled) {
228         versionDetector.setFilterPacketIns(enabled);
229         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
230     }
231 }