2 * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.math.BigInteger;
15 import java.net.InetSocketAddress;
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
18 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
19 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
20 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
21 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.Notification;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * Handles messages (notifications + rpcs) and connections.
44 * @author michal.polkorab
46 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
48 private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
50 private ConnectionReadyListener connectionReadyListener;
51 private OpenflowProtocolListener messageListener;
52 private SystemNotificationsListener systemListener;
53 private AlienMessageListener alienMessageListener;
54 private AbstractOutboundQueueManager<?, ?> outputManager;
55 private OFVersionDetector versionDetector;
56 private BigInteger datapathId;
58 private final boolean useBarrier;
61 * Default constructor.
62 * @param channel the channel to be set - used for communication
63 * @param address client address (used only in case of UDP communication,
64 * as there is no need to store address over tcp (stable channel))
65 * @param useBarrier value is configurable by configSubsytem
67 public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier,
68 final int channelOutboundQueueSize) {
69 super(channel, address, channelOutboundQueueSize);
70 this.useBarrier = useBarrier;
71 LOG.debug("ConnectionAdapter created");
75 public void setMessageListener(final OpenflowProtocolListener messageListener) {
76 this.messageListener = messageListener;
80 public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
81 this.connectionReadyListener = connectionReadyListener;
85 public void setSystemListener(final SystemNotificationsListener systemListener) {
86 this.systemListener = systemListener;
90 public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
91 this.alienMessageListener = alienMessageListener;
95 public void consumeDeviceMessage(final DataObject message) {
96 LOG.debug("ConsumeIntern msg {} for dpn {} on {}", message.implementedInterface().getSimpleName(),
98 LOG.trace("ConsumeIntern msg {}", message);
99 if (disconnectOccured) {
102 if (message instanceof Notification) {
105 if (message instanceof DisconnectEvent) {
106 systemListener.onDisconnectEvent((DisconnectEvent) message);
107 responseCache.invalidateAll();
108 disconnectOccured = true;
109 } else if (message instanceof SwitchIdleEvent) {
110 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
112 } else if (message instanceof EchoRequestMessage) {
113 if (outputManager != null) {
114 outputManager.onEchoRequest((EchoRequestMessage) message, datapathId);
116 messageListener.onEchoRequestMessage((EchoRequestMessage) message);
118 } else if (message instanceof ErrorMessage) {
119 // Send only unmatched errors
120 if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
121 messageListener.onErrorMessage((ErrorMessage) message);
123 } else if (message instanceof ExperimenterMessage) {
124 if (outputManager != null) {
125 outputManager.onMessage((OfHeader) message);
127 messageListener.onExperimenterMessage((ExperimenterMessage) message);
128 } else if (message instanceof FlowRemovedMessage) {
129 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
130 } else if (message instanceof HelloMessage) {
131 LOG.info("Hello received");
132 messageListener.onHelloMessage((HelloMessage) message);
133 } else if (message instanceof MultipartReplyMessage) {
134 if (outputManager != null) {
135 outputManager.onMessage((OfHeader) message);
137 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
138 } else if (message instanceof PacketInMessage) {
139 messageListener.onPacketInMessage((PacketInMessage) message);
140 } else if (message instanceof PortStatusMessage) {
141 messageListener.onPortStatusMessage((PortStatusMessage) message);
143 LOG.warn("message listening not supported for type: {}", message.getClass());
145 } else if (message instanceof OfHeader) {
146 LOG.debug("OF header msg received");
148 if (alienMessageListener != null && alienMessageListener.onAlienMessage((OfHeader) message)) {
149 LOG.debug("Alien message {} received", message.implementedInterface());
150 } else if (outputManager == null || !outputManager.onMessage((OfHeader) message)
151 || message instanceof EchoOutput) {
152 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
153 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
154 if (listener != null) {
155 LOG.debug("Corresponding rpcFuture found");
156 listener.completed((OfHeader) message);
157 LOG.debug("After setting rpcFuture");
158 responseCache.invalidate(key);
162 LOG.warn("message listening not supported for type: {}", message.getClass());
166 private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
167 return new RpcResponseKey(message.getXid(), message.implementedInterface().getName());
171 public void checkListeners() {
172 final StringBuilder buffer = new StringBuilder();
173 if (systemListener == null) {
174 buffer.append("SystemListener ");
176 if (messageListener == null) {
177 buffer.append("MessageListener ");
179 if (connectionReadyListener == null) {
180 buffer.append("ConnectionReadyListener ");
183 Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
187 public void fireConnectionReadyNotification() {
188 versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
189 Preconditions.checkState(versionDetector != null);
191 new Thread(() -> connectionReadyListener.onConnectionReady()).start();
195 public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
196 final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
197 Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
199 final AbstractOutboundQueueManager<T, ?> ret;
201 ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
203 LOG.warn("OutboundQueueManager without barrier is started.");
204 ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
208 /* we don't need it anymore */
209 channel.pipeline().remove(output);
210 // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
211 // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
212 // cause problems because we are shutting down the queue before Openflowplugin knows about it.
213 channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
214 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
216 return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
218 protected void removeRegistration() {
219 outputManager.close();
220 channel.pipeline().remove(outputManager);
221 outputManager = null;
226 Channel getChannel() {
231 public void setPacketInFiltering(final boolean enabled) {
232 versionDetector.setFilterPacketIns(enabled);
233 LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
237 public void setDatapathId(final BigInteger datapathId) {
238 this.datapathId = datapathId;