Ditch use of SystemNotificationsListener
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.math.BigInteger;
15 import java.net.InetSocketAddress;
16 import java.security.cert.CertificateParsingException;
17 import java.security.cert.X509Certificate;
18 import java.text.SimpleDateFormat;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.TimeZone;
23 import java.util.concurrent.ExecutorService;
24 import java.util.stream.Collectors;
25 import javax.naming.InvalidNameException;
26 import javax.naming.ldap.LdapName;
27 import javax.naming.ldap.Rdn;
28 import javax.security.auth.x500.X500Principal;
29 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
30 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
31 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
32 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
33 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
34 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SslConnectionError;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SslConnectionErrorBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927._switch.certificate.IssuerBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927._switch.certificate.SubjectBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.ssl.connection.error.SwitchCertificate;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.ssl.connection.error.SwitchCertificateBuilder;
55 import org.opendaylight.yangtools.yang.binding.DataObject;
56 import org.opendaylight.yangtools.yang.binding.Notification;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  * Handles messages (notifications + rpcs) and connections.
62  * @author mirehak
63  * @author michal.polkorab
64  */
65 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
66     private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
67
68     private ConnectionReadyListener connectionReadyListener;
69     private OpenflowProtocolListener messageListener;
70     private SystemListener systemListener;
71     private AlienMessageListener alienMessageListener;
72     private AbstractOutboundQueueManager<?, ?> outputManager;
73     private OFVersionDetector versionDetector;
74     private BigInteger datapathId;
75     private ExecutorService executorService;
76     private final boolean useBarrier;
77     private X509Certificate switchCertificate;
78
79     /**
80      * Default constructor.
81      * @param channel the channel to be set - used for communication
82      * @param address client address (used only in case of UDP communication,
83      *                as there is no need to store address over tcp (stable channel))
84      * @param useBarrier value is configurable by configSubsytem
85      */
86     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier,
87                                  final int channelOutboundQueueSize) {
88         super(channel, address, channelOutboundQueueSize);
89         this.useBarrier = useBarrier;
90         LOG.debug("ConnectionAdapter created");
91     }
92
93     @Override
94     public void setMessageListener(final OpenflowProtocolListener messageListener) {
95         this.messageListener = messageListener;
96     }
97
98     @Override
99     public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
100         this.connectionReadyListener = connectionReadyListener;
101     }
102
103     @Override
104     public void setSystemListener(final SystemListener systemListener) {
105         this.systemListener = systemListener;
106     }
107
108     @Override
109     public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
110         this.alienMessageListener = alienMessageListener;
111     }
112
113     @Override
114     public void consumeDeviceMessage(final DataObject message) {
115         LOG.debug("ConsumeIntern msg {} for dpn {} on {}", message.implementedInterface().getSimpleName(),
116                 datapathId, channel);
117         LOG.trace("ConsumeIntern msg {}", message);
118         if (disconnectOccured) {
119             return;
120         }
121         if (message instanceof Notification) {
122             // System events
123             if (message instanceof DisconnectEvent disconnect) {
124                 systemListener.onDisconnect(disconnect);
125                 responseCache.invalidateAll();
126                 disconnectOccured = true;
127             } else if (message instanceof SwitchIdleEvent switchIdle) {
128                 systemListener.onSwitchIdle(switchIdle);
129             } else if (message instanceof SslConnectionError sslError) {
130                 systemListener.onSslConnectionError(new SslConnectionErrorBuilder()
131                     .setInfo(sslError.getInfo())
132                     .setSwitchCertificate(buildSwitchCertificate())
133                     .build());
134             // OpenFlow messages
135             } else if (message instanceof EchoRequestMessage echoRequest) {
136                 if (outputManager != null) {
137                     outputManager.onEchoRequest(echoRequest, datapathId);
138                 } else {
139                     messageListener.onEchoRequestMessage(echoRequest);
140                 }
141             } else if (message instanceof ErrorMessage error) {
142                 // Send only unmatched errors
143                 if (outputManager == null || !outputManager.onMessage(error)) {
144                     messageListener.onErrorMessage(error);
145                 }
146             } else if (message instanceof ExperimenterMessage experimenter) {
147                 if (outputManager != null) {
148                     outputManager.onMessage(experimenter);
149                 }
150                 messageListener.onExperimenterMessage(experimenter);
151             } else if (message instanceof FlowRemovedMessage flowRemoved) {
152                 messageListener.onFlowRemovedMessage(flowRemoved);
153             } else if (message instanceof HelloMessage hello) {
154                 LOG.info("Hello received");
155                 messageListener.onHelloMessage(hello);
156             } else if (message instanceof MultipartReplyMessage multipartReply) {
157                 if (outputManager != null) {
158                     outputManager.onMessage(multipartReply);
159                 }
160                 messageListener.onMultipartReplyMessage(multipartReply);
161             } else if (message instanceof PacketInMessage packetIn) {
162                 messageListener.onPacketInMessage(packetIn);
163             } else if (message instanceof PortStatusMessage portStatus) {
164                 messageListener.onPortStatusMessage(portStatus);
165             } else {
166                 LOG.warn("message listening not supported for type: {}", message.getClass());
167             }
168         } else if (message instanceof OfHeader header) {
169             LOG.debug("OF header msg received");
170
171             if (alienMessageListener != null && alienMessageListener.onAlienMessage(header)) {
172                 LOG.debug("Alien message {} received", header.implementedInterface());
173             } else if (outputManager == null || !outputManager.onMessage(header) || header instanceof EchoOutput) {
174                 final RpcResponseKey key = createRpcResponseKey(header);
175                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
176                 if (listener != null) {
177                     LOG.debug("Corresponding rpcFuture found");
178                     listener.completed(header);
179                     LOG.debug("After setting rpcFuture");
180                     responseCache.invalidate(key);
181                 }
182             }
183         } else {
184             LOG.warn("message listening not supported for type: {}", message.getClass());
185         }
186     }
187
188     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
189         return new RpcResponseKey(message.getXid().toJava(), message.implementedInterface().getName());
190     }
191
192     @Override
193     public void checkListeners() {
194         final StringBuilder buffer =  new StringBuilder();
195         if (systemListener == null) {
196             buffer.append("SystemListener ");
197         }
198         if (messageListener == null) {
199             buffer.append("MessageListener ");
200         }
201         if (connectionReadyListener == null) {
202             buffer.append("ConnectionReadyListener ");
203         }
204
205         Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
206     }
207
208     @Override
209     public void fireConnectionReadyNotification() {
210         versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
211         Preconditions.checkState(versionDetector != null);
212         executorService.execute(() -> connectionReadyListener.onConnectionReady());
213     }
214
215     @Override
216     public void onSwitchCertificateIdentified(final List<X509Certificate> certificateChain) {
217         if (certificateChain != null && !certificateChain.isEmpty()) {
218             switchCertificate = certificateChain.get(0);
219         }
220     }
221
222     @Override
223     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
224             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
225         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
226
227         final AbstractOutboundQueueManager<T, ?> ret;
228         if (useBarrier) {
229             ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
230         } else {
231             LOG.warn("OutboundQueueManager without barrier is started.");
232             ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
233         }
234
235         outputManager = ret;
236         /* we don't need it anymore */
237         channel.pipeline().remove(output);
238         // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
239         // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
240         // cause problems because we are shutting down the queue before Openflowplugin knows about it.
241         channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
242                 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
243
244         return new OutboundQueueHandlerRegistrationImpl<>(handler) {
245             @Override
246             protected void removeRegistration() {
247                 outputManager.close();
248                 channel.pipeline().remove(outputManager);
249                 outputManager = null;
250             }
251         };
252     }
253
254     Channel getChannel() {
255         return channel;
256     }
257
258     @Override
259     public void setPacketInFiltering(final boolean enabled) {
260         versionDetector.setFilterPacketIns(enabled);
261         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
262     }
263
264     private SwitchCertificate buildSwitchCertificate() {
265         if (switchCertificate == null) {
266             return null;
267         }
268
269         final var builder = new SwitchCertificateBuilder();
270         final var subjectMap = indexRds(switchCertificate.getSubjectX500Principal());
271         if (subjectMap != null) {
272             builder.setSubject(new SubjectBuilder()
273                 .setCommonName(subjectMap.get("CN"))
274                 .setCountry(subjectMap.get("C"))
275                 .setLocality(subjectMap.get("L"))
276                 .setOrganization(subjectMap.get("O"))
277                 .setOrganizationUnit(subjectMap.get("OU"))
278                 .setState(subjectMap.get("ST"))
279                 .build());
280         }
281
282         final var issuerMap = indexRds(switchCertificate.getIssuerX500Principal());
283         if (issuerMap != null) {
284             builder.setIssuer(new IssuerBuilder()
285                 .setCommonName(issuerMap.get("CN"))
286                 .setCountry(issuerMap.get("C"))
287                 .setLocality(issuerMap.get("L"))
288                 .setOrganization(issuerMap.get("O"))
289                 .setOrganizationUnit(issuerMap.get("OU"))
290                 .setState(issuerMap.get("ST"))
291                 .build());
292         }
293
294         Collection<List<?>> altNames = null;
295         try {
296             altNames = switchCertificate.getSubjectAlternativeNames();
297         } catch (CertificateParsingException e) {
298             LOG.error("Cannot parse certificate alternate names", e);
299         }
300         if (altNames != null) {
301             builder.setSubjectAlternateNames(altNames.stream()
302                 .filter(list -> list.size() > 1)
303                 .map(list -> list.get(1))
304                 .filter(String.class::isInstance)
305                 .map(String.class::cast)
306                 .collect(Collectors.toUnmodifiableList()));
307         }
308
309         // FIXME: do not use SimpleDateFormat
310         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'-00:00'");
311         formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
312
313         return builder
314             .setSerialNumber(switchCertificate.getSerialNumber().toString())
315             .setValidFrom(new DateAndTime(formatter.format(switchCertificate.getNotBefore())))
316             .setValidTo(new DateAndTime(formatter.format(switchCertificate.getNotAfter())))
317             .build();
318     }
319
320     @Override
321     public void setDatapathId(final BigInteger datapathId) {
322         this.datapathId = datapathId;
323     }
324
325     @Override
326     public void setExecutorService(final ExecutorService executorService) {
327         this.executorService = executorService;
328     }
329
330     private static Map<String, String> indexRds(final X500Principal principal) {
331         final LdapName name;
332         try {
333             name = new LdapName(principal.getName());
334         } catch (InvalidNameException e) {
335             LOG.error("Cannot parse principal {}", principal, e);
336             return null;
337         }
338         return name.getRdns().stream().collect(Collectors.toMap(Rdn::getType, rdn -> rdn.getValue().toString()));
339     }
340 }