Cleanup switch certificate chain handling
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11
12 import com.google.common.base.Preconditions;
13 import io.netty.channel.Channel;
14 import java.math.BigInteger;
15 import java.net.InetSocketAddress;
16 import java.security.cert.CertificateParsingException;
17 import java.security.cert.X509Certificate;
18 import java.text.SimpleDateFormat;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.TimeZone;
23 import java.util.concurrent.ExecutorService;
24 import java.util.stream.Collectors;
25 import javax.naming.InvalidNameException;
26 import javax.naming.ldap.LdapName;
27 import javax.naming.ldap.Rdn;
28 import javax.security.auth.x500.X500Principal;
29 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
30 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
31 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
32 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
33 import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
34 import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SslConnectionError;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SslConnectionErrorBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927._switch.certificate.IssuerBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927._switch.certificate.SubjectBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.ssl.connection.error.SwitchCertificate;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.ssl.connection.error.SwitchCertificateBuilder;
56 import org.opendaylight.yangtools.yang.binding.DataObject;
57 import org.opendaylight.yangtools.yang.binding.Notification;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * Handles messages (notifications + rpcs) and connections.
63  * @author mirehak
64  * @author michal.polkorab
65  */
66 public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics implements ConnectionFacade {
67     private static final Logger LOG = LoggerFactory.getLogger(ConnectionAdapterImpl.class);
68
69     private ConnectionReadyListener connectionReadyListener;
70     private OpenflowProtocolListener messageListener;
71     private SystemNotificationsListener systemListener;
72     private AlienMessageListener alienMessageListener;
73     private AbstractOutboundQueueManager<?, ?> outputManager;
74     private OFVersionDetector versionDetector;
75     private BigInteger datapathId;
76     private ExecutorService executorService;
77     private final boolean useBarrier;
78     private X509Certificate switchCertificate;
79
80     /**
81      * Default constructor.
82      * @param channel the channel to be set - used for communication
83      * @param address client address (used only in case of UDP communication,
84      *                as there is no need to store address over tcp (stable channel))
85      * @param useBarrier value is configurable by configSubsytem
86      */
87     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier,
88                                  final int channelOutboundQueueSize) {
89         super(channel, address, channelOutboundQueueSize);
90         this.useBarrier = useBarrier;
91         LOG.debug("ConnectionAdapter created");
92     }
93
94     @Override
95     public void setMessageListener(final OpenflowProtocolListener messageListener) {
96         this.messageListener = messageListener;
97     }
98
99     @Override
100     public void setConnectionReadyListener(final ConnectionReadyListener connectionReadyListener) {
101         this.connectionReadyListener = connectionReadyListener;
102     }
103
104     @Override
105     public void setSystemListener(final SystemNotificationsListener systemListener) {
106         this.systemListener = systemListener;
107     }
108
109     @Override
110     public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
111         this.alienMessageListener = alienMessageListener;
112     }
113
114     @Override
115     public void consumeDeviceMessage(final DataObject message) {
116         LOG.debug("ConsumeIntern msg {} for dpn {} on {}", message.implementedInterface().getSimpleName(),
117                 datapathId, channel);
118         LOG.trace("ConsumeIntern msg {}", message);
119         if (disconnectOccured) {
120             return;
121         }
122         if (message instanceof Notification) {
123
124             // System events
125             if (message instanceof DisconnectEvent) {
126                 systemListener.onDisconnectEvent((DisconnectEvent) message);
127                 responseCache.invalidateAll();
128                 disconnectOccured = true;
129             } else if (message instanceof SwitchIdleEvent) {
130                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
131             } else if (message instanceof SslConnectionError) {
132                 systemListener.onSslConnectionError(new SslConnectionErrorBuilder()
133                         .setInfo(((SslConnectionError) message).getInfo())
134                         .setSwitchCertificate(buildSwitchCertificate())
135                         .build());
136             // OpenFlow messages
137             } else if (message instanceof EchoRequestMessage) {
138                 if (outputManager != null) {
139                     outputManager.onEchoRequest((EchoRequestMessage) message, datapathId);
140                 } else {
141                     messageListener.onEchoRequestMessage((EchoRequestMessage) message);
142                 }
143             } else if (message instanceof ErrorMessage) {
144                 // Send only unmatched errors
145                 if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
146                     messageListener.onErrorMessage((ErrorMessage) message);
147                 }
148             } else if (message instanceof ExperimenterMessage) {
149                 if (outputManager != null) {
150                     outputManager.onMessage((OfHeader) message);
151                 }
152                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
153             } else if (message instanceof FlowRemovedMessage) {
154                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
155             } else if (message instanceof HelloMessage) {
156                 LOG.info("Hello received");
157                 messageListener.onHelloMessage((HelloMessage) message);
158             } else if (message instanceof MultipartReplyMessage) {
159                 if (outputManager != null) {
160                     outputManager.onMessage((OfHeader) message);
161                 }
162                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
163             } else if (message instanceof PacketInMessage) {
164                 messageListener.onPacketInMessage((PacketInMessage) message);
165             } else if (message instanceof PortStatusMessage) {
166                 messageListener.onPortStatusMessage((PortStatusMessage) message);
167             } else {
168                 LOG.warn("message listening not supported for type: {}", message.getClass());
169             }
170         } else if (message instanceof OfHeader) {
171             LOG.debug("OF header msg received");
172
173             if (alienMessageListener != null && alienMessageListener.onAlienMessage((OfHeader) message)) {
174                 LOG.debug("Alien message {} received", message.implementedInterface());
175             } else if (outputManager == null || !outputManager.onMessage((OfHeader) message)
176                     || message instanceof EchoOutput) {
177                 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
178                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
179                 if (listener != null) {
180                     LOG.debug("Corresponding rpcFuture found");
181                     listener.completed((OfHeader) message);
182                     LOG.debug("After setting rpcFuture");
183                     responseCache.invalidate(key);
184                 }
185             }
186         } else {
187             LOG.warn("message listening not supported for type: {}", message.getClass());
188         }
189     }
190
191     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
192         return new RpcResponseKey(message.getXid().toJava(), message.implementedInterface().getName());
193     }
194
195     @Override
196     public void checkListeners() {
197         final StringBuilder buffer =  new StringBuilder();
198         if (systemListener == null) {
199             buffer.append("SystemListener ");
200         }
201         if (messageListener == null) {
202             buffer.append("MessageListener ");
203         }
204         if (connectionReadyListener == null) {
205             buffer.append("ConnectionReadyListener ");
206         }
207
208         Preconditions.checkState(buffer.length() == 0, "Missing listeners: %s", buffer.toString());
209     }
210
211     @Override
212     public void fireConnectionReadyNotification() {
213         versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
214         Preconditions.checkState(versionDetector != null);
215         executorService.execute(() -> connectionReadyListener.onConnectionReady());
216     }
217
218     @Override
219     public void onSwitchCertificateIdentified(final List<X509Certificate> certificateChain) {
220         if (certificateChain != null && !certificateChain.isEmpty()) {
221             switchCertificate = certificateChain.get(0);
222         }
223     }
224
225     @Override
226     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
227             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
228         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
229
230         final AbstractOutboundQueueManager<T, ?> ret;
231         if (useBarrier) {
232             ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
233         } else {
234             LOG.warn("OutboundQueueManager without barrier is started.");
235             ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
236         }
237
238         outputManager = ret;
239         /* we don't need it anymore */
240         channel.pipeline().remove(output);
241         // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
242         // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
243         // cause problems because we are shutting down the queue before Openflowplugin knows about it.
244         channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
245                 PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
246
247         return new OutboundQueueHandlerRegistrationImpl<>(handler) {
248             @Override
249             protected void removeRegistration() {
250                 outputManager.close();
251                 channel.pipeline().remove(outputManager);
252                 outputManager = null;
253             }
254         };
255     }
256
257     Channel getChannel() {
258         return channel;
259     }
260
261     @Override
262     public void setPacketInFiltering(final boolean enabled) {
263         versionDetector.setFilterPacketIns(enabled);
264         LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
265     }
266
267     private SwitchCertificate buildSwitchCertificate() {
268         if (switchCertificate == null) {
269             return null;
270         }
271
272         final var builder = new SwitchCertificateBuilder();
273         final var subjectMap = indexRds(switchCertificate.getSubjectX500Principal());
274         if (subjectMap != null) {
275             builder.setSubject(new SubjectBuilder()
276                 .setCommonName(subjectMap.get("CN"))
277                 .setCountry(subjectMap.get("C"))
278                 .setLocality(subjectMap.get("L"))
279                 .setOrganization(subjectMap.get("O"))
280                 .setOrganizationUnit(subjectMap.get("OU"))
281                 .setState(subjectMap.get("ST"))
282                 .build());
283         }
284
285         final var issuerMap = indexRds(switchCertificate.getIssuerX500Principal());
286         if (issuerMap != null) {
287             builder.setIssuer(new IssuerBuilder()
288                 .setCommonName(issuerMap.get("CN"))
289                 .setCountry(issuerMap.get("C"))
290                 .setLocality(issuerMap.get("L"))
291                 .setOrganization(issuerMap.get("O"))
292                 .setOrganizationUnit(issuerMap.get("OU"))
293                 .setState(issuerMap.get("ST"))
294                 .build());
295         }
296
297         Collection<List<?>> altNames = null;
298         try {
299             altNames = switchCertificate.getSubjectAlternativeNames();
300         } catch (CertificateParsingException e) {
301             LOG.error("Cannot parse certificate alternate names", e);
302         }
303         if (altNames != null) {
304             builder.setSubjectAlternateNames(altNames.stream()
305                 .filter(list -> list.size() > 1)
306                 .map(list -> list.get(1))
307                 .filter(String.class::isInstance)
308                 .map(String.class::cast)
309                 .collect(Collectors.toUnmodifiableList()));
310         }
311
312         // FIXME: do not use SimpleDateFormat
313         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'-00:00'");
314         formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
315
316         return builder
317             .setSerialNumber(switchCertificate.getSerialNumber().toString())
318             .setValidFrom(new DateAndTime(formatter.format(switchCertificate.getNotBefore())))
319             .setValidTo(new DateAndTime(formatter.format(switchCertificate.getNotAfter())))
320             .build();
321     }
322
323     @Override
324     public void setDatapathId(final BigInteger datapathId) {
325         this.datapathId = datapathId;
326     }
327
328     @Override
329     public void setExecutorService(final ExecutorService executorService) {
330         this.executorService = executorService;
331     }
332
333     private static Map<String, String> indexRds(final X500Principal principal) {
334         final LdapName name;
335         try {
336             name = new LdapName(principal.getName());
337         } catch (InvalidNameException e) {
338             LOG.error("Cannot parse principal {}", principal, e);
339             return null;
340         }
341         return name.getRdns().stream().collect(Collectors.toMap(Rdn::getType, rdn -> rdn.getValue().toString()));
342     }
343 }