2 * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.openflowjava.protocol.impl.core;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.epoll.Epoll;
19 import org.checkerframework.checker.nullness.qual.Nullable;
20 import org.opendaylight.infrautils.diagstatus.ServiceState;
21 import org.opendaylight.infrautils.utils.concurrent.Executors;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
23 import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider;
24 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
25 import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
26 import org.opendaylight.openflowjava.protocol.api.extensibility.OFDeserializer;
27 import org.opendaylight.openflowjava.protocol.api.extensibility.OFGeneralDeserializer;
28 import org.opendaylight.openflowjava.protocol.api.extensibility.OFGeneralSerializer;
29 import org.opendaylight.openflowjava.protocol.api.extensibility.OFSerializer;
30 import org.opendaylight.openflowjava.protocol.api.extensibility.SerializerRegistry;
31 import org.opendaylight.openflowjava.protocol.api.keys.ActionSerializerKey;
32 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterActionDeserializerKey;
33 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterDeserializerKey;
34 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterIdDeserializerKey;
35 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterIdMeterSubTypeSerializerKey;
36 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterIdSerializerKey;
37 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterInstructionDeserializerKey;
38 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterSerializerKey;
39 import org.opendaylight.openflowjava.protocol.api.keys.InstructionSerializerKey;
40 import org.opendaylight.openflowjava.protocol.api.keys.MatchEntryDeserializerKey;
41 import org.opendaylight.openflowjava.protocol.api.keys.MatchEntrySerializerKey;
42 import org.opendaylight.openflowjava.protocol.api.keys.MessageCodeKey;
43 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
44 import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
45 import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
46 import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializerRegistryImpl;
47 import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
48 import org.opendaylight.openflowjava.protocol.impl.serialization.SerializerRegistryImpl;
49 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.TransportProtocol;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.MatchField;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.OxmClassBase;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.meter.band.header.meter.band.MeterBandExperimenterCase;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.queue.property.header.QueueProperty;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.table.features.properties.grouping.TableFeatureProperties;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
62 * Exposed class for server handling. <br>
63 * C - {@link MatchEntrySerializerKey} parameter representing oxm_class (see specification)<br>
64 * F - {@link MatchEntrySerializerKey} parameter representing oxm_field (see specification)
66 * @author michal.polkorab
68 public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
70 private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
71 private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
72 private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
74 private SwitchConnectionHandler switchConnectionHandler;
75 private ServerFacade serverFacade;
76 private final ConnectionConfiguration connConfig;
77 private final SerializationFactory serializationFactory;
78 private final SerializerRegistry serializerRegistry;
79 private final DeserializerRegistry deserializerRegistry;
80 private final DeserializationFactory deserializationFactory;
81 private final ListeningExecutorService listeningExecutorService;
82 private final String diagStatusIdentifier;
83 private final String threadName;
84 private TcpConnectionInitializer connectionInitializer;
85 private OpenflowDiagStatusProvider openflowDiagStatusProvider;
87 public SwitchConnectionProviderImpl(
88 @Nullable ConnectionConfiguration connConfig, OpenflowDiagStatusProvider openflowDiagStatusProvider) {
89 this.connConfig = connConfig;
90 String connectionSuffix = createConnectionSuffix(connConfig);
91 this.diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
92 this.openflowDiagStatusProvider = openflowDiagStatusProvider;
93 this.threadName = THREAD_NAME_PREFIX + connectionSuffix;
94 this.listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
95 serializerRegistry = new SerializerRegistryImpl();
96 if (connConfig != null) {
97 serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
99 serializerRegistry.init();
100 serializationFactory = new SerializationFactory(serializerRegistry);
101 deserializerRegistry = new DeserializerRegistryImpl();
102 deserializerRegistry.init();
103 deserializationFactory = new DeserializationFactory(deserializerRegistry);
106 // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
107 private static String createConnectionSuffix(@Nullable ConnectionConfiguration config) {
108 if (config != null && config.getAddress() != null) {
109 return "-" + config.getAddress().toString() + "_" + config.getPort();
110 } else if (config != null) {
111 return "_" + config.getPort();
113 return "-null-config";
118 public void setSwitchConnectionHandler(final SwitchConnectionHandler switchConnectionHandler) {
119 LOG.debug("setSwitchConnectionHandler");
120 this.switchConnectionHandler = switchConnectionHandler;
124 public ListenableFuture<Boolean> shutdown() {
125 LOG.debug("Shutdown summoned");
126 if (serverFacade == null) {
127 LOG.warn("Can not shutdown - not configured or started");
128 throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
130 ListenableFuture<Boolean> serverFacadeShutdownFuture = serverFacade.shutdown();
131 Executors.shutdownAndAwaitTermination(listeningExecutorService);
132 return serverFacadeShutdownFuture;
136 @SuppressWarnings("checkstyle:IllegalCatch")
137 public ListenableFuture<Boolean> startup() {
138 LOG.debug("Startup summoned");
140 serverFacade = createAndConfigureServer();
141 if (switchConnectionHandler == null) {
142 throw new IllegalStateException("SwitchConnectionHandler is not set");
144 Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
147 public void onFailure(Throwable throwable) {
148 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, throwable);
152 public void onSuccess(@Nullable Object nullResult) {
153 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, ServiceState.ERROR,
154 threadName + " terminated");
156 } , MoreExecutors.directExecutor());
157 return serverFacade.getIsOnlineFuture();
158 } catch (RuntimeException e) {
159 return Futures.immediateFailedFuture(e);
163 private ServerFacade createAndConfigureServer() {
164 LOG.debug("Configuring ..");
166 final ChannelInitializerFactory factory = new ChannelInitializerFactory();
167 factory.setSwitchConnectionHandler(switchConnectionHandler);
168 factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
169 factory.setTlsConfig(connConfig.getTlsConfiguration());
170 factory.setSerializationFactory(serializationFactory);
171 factory.setDeserializationFactory(deserializationFactory);
172 factory.setUseBarrier(connConfig.useBarrier());
173 factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
174 final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
175 // Check if Epoll native transport is available.
176 // TODO : Add option to disable Epoll.
177 boolean isEpollEnabled = Epoll.isAvailable();
179 if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
180 server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(), () ->
181 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, ServiceState.OPERATIONAL));
182 final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
183 ((TcpHandler) server).setChannelInitializer(channelInitializer);
184 ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
185 final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup();
186 connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled);
187 connectionInitializer.setChannelInitializer(channelInitializer);
188 connectionInitializer.run();
189 } else if (TransportProtocol.UDP.equals(transportProtocol)) {
190 server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(), () ->
191 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, ServiceState.OPERATIONAL));
192 ((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
193 ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
195 throw new IllegalStateException("Unknown transport protocol received: " + transportProtocol);
197 server.setThreadConfig(connConfig.getThreadConfiguration());
201 public ServerFacade getServerFacade() {
206 public void close() throws Exception {
211 public boolean unregisterSerializer(final ExperimenterSerializerKey key) {
212 return serializerRegistry.unregisterSerializer((MessageTypeKey<?>) key);
216 public boolean unregisterDeserializer(final ExperimenterDeserializerKey key) {
217 return deserializerRegistry.unregisterDeserializer((MessageCodeKey) key);
221 public void registerActionSerializer(final ActionSerializerKey<?> key,
222 final OFGeneralSerializer serializer) {
223 serializerRegistry.registerSerializer(key, serializer);
227 public void registerActionDeserializer(final ExperimenterActionDeserializerKey key,
228 final OFGeneralDeserializer deserializer) {
229 deserializerRegistry.registerDeserializer(key, deserializer);
233 public void registerInstructionSerializer(final InstructionSerializerKey<?> key,
234 final OFGeneralSerializer serializer) {
235 serializerRegistry.registerSerializer(key, serializer);
239 public void registerInstructionDeserializer(final ExperimenterInstructionDeserializerKey key,
240 final OFGeneralDeserializer deserializer) {
241 deserializerRegistry.registerDeserializer(key, deserializer);
245 public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(
246 final MatchEntrySerializerKey<C, F> key, final OFGeneralSerializer serializer) {
247 serializerRegistry.registerSerializer(key, serializer);
251 public void registerMatchEntryDeserializer(final MatchEntryDeserializerKey key,
252 final OFGeneralDeserializer deserializer) {
253 deserializerRegistry.registerDeserializer(key, deserializer);
257 public void registerErrorDeserializer(final ExperimenterIdDeserializerKey key,
258 final OFDeserializer<ErrorMessage> deserializer) {
259 deserializerRegistry.registerDeserializer(key, deserializer);
263 public void registerExperimenterMessageDeserializer(ExperimenterIdDeserializerKey key,
264 OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
265 deserializerRegistry.registerDeserializer(key, deserializer);
269 public void registerMultipartReplyMessageDeserializer(ExperimenterIdDeserializerKey key,
270 OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
271 deserializerRegistry.registerDeserializer(key, deserializer);
275 public void registerMultipartReplyTFDeserializer(final ExperimenterIdDeserializerKey key,
276 final OFGeneralDeserializer deserializer) {
277 deserializerRegistry.registerDeserializer(key, deserializer);
281 public void registerQueuePropertyDeserializer(final ExperimenterIdDeserializerKey key,
282 final OFDeserializer<QueueProperty> deserializer) {
283 deserializerRegistry.registerDeserializer(key, deserializer);
287 public void registerMeterBandDeserializer(final ExperimenterIdDeserializerKey key,
288 final OFDeserializer<MeterBandExperimenterCase> deserializer) {
289 deserializerRegistry.registerDeserializer(key, deserializer);
293 public void registerExperimenterMessageSerializer(
294 ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
295 OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
296 serializerRegistry.registerSerializer(key, serializer);
300 public void registerMultipartRequestSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
301 OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
302 serializerRegistry.registerSerializer(key, serializer);
306 public void registerMultipartRequestTFSerializer(final ExperimenterIdSerializerKey<TableFeatureProperties> key,
307 final OFGeneralSerializer serializer) {
308 serializerRegistry.registerSerializer(key, serializer);
314 * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order
315 * to avoid the occurrence of an error, we should discard this function.
319 public void registerMeterBandSerializer(final ExperimenterIdSerializerKey<MeterBandExperimenterCase> key,
320 final OFSerializer<MeterBandExperimenterCase> serializer) {
321 serializerRegistry.registerSerializer(key, serializer);
325 public void registerMeterBandSerializer(
326 final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
327 final OFSerializer<MeterBandExperimenterCase> serializer) {
328 serializerRegistry.registerSerializer(key, serializer);
332 public void initiateConnection(final String host, final int port) {
333 connectionInitializer.initiateConnection(host, port);
337 public ConnectionConfiguration getConfiguration() {
338 return this.connConfig;
342 public <K> void registerSerializer(MessageTypeKey<K> key, OFGeneralSerializer serializer) {
343 serializerRegistry.registerSerializer(key, serializer);
347 public void registerDeserializer(MessageCodeKey key, OFGeneralDeserializer deserializer) {
348 deserializerRegistry.registerDeserializer(key, deserializer);
352 public void registerDeserializerMapping(final TypeToClassKey key, final Class<?> clazz) {
353 deserializationFactory.registerMapping(key, clazz);
357 public boolean unregisterDeserializerMapping(final TypeToClassKey key) {
358 return deserializationFactory.unregisterMapping(key);