Merge "Sending each flow/group in separate bundle add rpc instead of adding all messa...
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.math.BigInteger;
15 import java.net.InetSocketAddress;
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
18 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
19 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
20 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
21 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.Notification;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Handles messages (notifications + rpcs) and connections.
43  * @author mirehak
44  * @author michal.polkorab
45  */
46 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
47
48     private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
49
50     private ConnectionReadyListener connectionReadyListener;
51     private OpenflowProtocolListener messageListener;
52     private SystemNotificationsListener systemListener;
53     private AlienMessageListener alienMessageListener;
54     private AbstractOutboundQueueManager<?, ?> outputManager;
55     private OFVersionDetector versionDetector;
56     private BigInteger datapathId;
57
58     private final boolean useBarrier;
59
60     /**
61      * Default constructor.
62      * @param channel the channel to be set - used for communication
63      * @param address client address (used only in case of UDP communication,
64      *                as there is no need to store address over tcp (stable channel))
65      * @param useBarrier value is configurable by configSubsytem
66      */
67     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier,
68                                  final int channelOutboundQueueSize) {
69         super(channel, address, channelOutboundQueueSize);
70         this.useBarrier = useBarrier;
71         LOG.debug("ConnectionAdapter created");
72     }
73
74     @Override
75     public void setMessageListener(final OpenflowProtocolListener messageListener) {
76         this.messageListener = messageListener;
77     }
78
79     @Override
80     public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
81         this.connectionReadyListener = connectionReadyListener;
82     }
83
84     @Override
85     public void setSystemListener(final SystemNotificationsListener systemListener) {
86         this.systemListener = systemListener;
87     }
88
89     @Override
90     public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
91         this.alienMessageListener = alienMessageListener;
92     }
93
94     @Override
95     public void consumeDeviceMessage(final DataObject message) {
96         LOG.debug("ConsumeIntern msg {} for dpn {} on {}", message.implementedInterface().getSimpleName(),
97                 datapathId, channel);
98         LOG.trace("ConsumeIntern msg {}", message);
99         if (disconnectOccured) {
100             return;
101         }
102         if (message instanceof Notification) {
103
104             // System events
105             if (message instanceof DisconnectEvent) {
106                 systemListener.onDisconnectEvent((DisconnectEvent) message);
107                 responseCache.invalidateAll();
108                 disconnectOccured = true;
109             } else if (message instanceof SwitchIdleEvent) {
110                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
111             // OpenFlow messages
112             } else if (message instanceof EchoRequestMessage) {
113                 if (outputManager != null) {
114                     outputManager.onEchoRequest((EchoRequestMessage) message, datapathId);
115                 } else {
116                     messageListener.onEchoRequestMessage((EchoRequestMessage) message);
117                 }
118             } else if (message instanceof ErrorMessage) {
119                 // Send only unmatched errors
120                 if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
121                     messageListener.onErrorMessage((ErrorMessage) message);
122                 }
123             } else if (message instanceof ExperimenterMessage) {
124                 if (outputManager != null) {
125                     outputManager.onMessage((OfHeader) message);
126                 }
127                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
128             } else if (message instanceof FlowRemovedMessage) {
129                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
130             } else if (message instanceof HelloMessage) {
131                 LOG.info("Hello received");
132                 messageListener.onHelloMessage((HelloMessage) message);
133             } else if (message instanceof MultipartReplyMessage) {
134                 if (outputManager != null) {
135                     outputManager.onMessage((OfHeader) message);
136                 }
137                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
138             } else if (message instanceof PacketInMessage) {
139                 messageListener.onPacketInMessage((PacketInMessage) message);
140             } else if (message instanceof PortStatusMessage) {
141                 messageListener.onPortStatusMessage((PortStatusMessage) message);
142             } else {
143                 LOG.warn("message listening not supported for type: {}", message.getClass());
144             }
145         } else if (message instanceof OfHeader) {
146             LOG.debug("OF header msg received");
147
148             if (alienMessageListener != null && alienMessageListener.onAlienMessage((OfHeader) message)) {
149                 LOG.debug("Alien message {} received", message.implementedInterface());
150             } else if (outputManager == null || !outputManager.onMessage((OfHeader) message)
151                     || message instanceof EchoOutput) {
152                 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
153                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
154                 if (listener != null) {
155                     LOG.debug("Corresponding rpcFuture found");
156                     listener.completed((OfHeader) message);
157                     LOG.debug("After setting rpcFuture");
158                     responseCache.invalidate(key);
159                 }
160             }
161         } else {
162             LOG.warn("message listening not supported for type: {}", message.getClass());
163         }
164     }
165
166     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
167         return new RpcResponseKey(message.getXid(), message.implementedInterface().getName());
168     }
169
170     @Override
171     public void checkListeners() {
172         final StringBuilder buffer =  new StringBuilder();
173         if (systemListener == null) {
174             buffer.append("SystemListener ");
175         }
176         if (messageListener == null) {
177             buffer.append("MessageListener ");
178         }
179         if (connectionReadyListener == null) {
180             buffer.append("ConnectionReadyListener ");
181         }
182
183         Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
184     }
185
186     @Override
187     public void fireConnectionReadyNotification() {
188         versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
189         Preconditions.checkState(versionDetector != null);
190
191         new Thread(() -> connectionReadyListener.onConnectionReady()).start();
192     }
193
194     @Override
195     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
196             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
197         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
198
199         final AbstractOutboundQueueManager<T, ?> ret;
200         if (useBarrier) {
201             ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
202         } else {
203             LOG.warn("OutboundQueueManager without barrier is started.");
204             ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
205         }
206
207         outputManager = ret;
208         /* we don't need it anymore */
209         channel.pipeline().remove(output);
210         // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
211         // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
212         // cause problems because we are shutting down the queue before Openflowplugin knows about it.
213         channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
214                 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
215
216         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
217             @Override
218             protected void removeRegistration() {
219                 outputManager.close();
220                 channel.pipeline().remove(outputManager);
221                 outputManager = null;
222             }
223         };
224     }
225
226     Channel getChannel() {
227         return channel;
228     }
229
230     @Override
231     public void setPacketInFiltering(final boolean enabled) {
232         versionDetector.setFilterPacketIns(enabled);
233         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
234     }
235
236     @Override
237     public void setDatapathId(final BigInteger datapathId) {
238         this.datapathId = datapathId;
239     }
240 }