2 * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.openflowjava.protocol.impl.core;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.epoll.Epoll;
19 import org.checkerframework.checker.nullness.qual.Nullable;
20 import org.opendaylight.infrautils.diagstatus.ServiceState;
21 import org.opendaylight.infrautils.utils.concurrent.Executors;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
23 import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider;
24 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
25 import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
26 import org.opendaylight.openflowjava.protocol.api.extensibility.OFDeserializer;
27 import org.opendaylight.openflowjava.protocol.api.extensibility.OFGeneralDeserializer;
28 import org.opendaylight.openflowjava.protocol.api.extensibility.OFGeneralSerializer;
29 import org.opendaylight.openflowjava.protocol.api.extensibility.OFSerializer;
30 import org.opendaylight.openflowjava.protocol.api.extensibility.SerializerRegistry;
31 import org.opendaylight.openflowjava.protocol.api.keys.ActionSerializerKey;
32 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterActionDeserializerKey;
33 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterDeserializerKey;
34 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterIdDeserializerKey;
35 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterIdMeterSubTypeSerializerKey;
36 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterIdSerializerKey;
37 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterInstructionDeserializerKey;
38 import org.opendaylight.openflowjava.protocol.api.keys.ExperimenterSerializerKey;
39 import org.opendaylight.openflowjava.protocol.api.keys.InstructionSerializerKey;
40 import org.opendaylight.openflowjava.protocol.api.keys.MatchEntryDeserializerKey;
41 import org.opendaylight.openflowjava.protocol.api.keys.MatchEntrySerializerKey;
42 import org.opendaylight.openflowjava.protocol.api.keys.MessageCodeKey;
43 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
44 import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
45 import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
46 import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializerRegistryImpl;
47 import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
48 import org.opendaylight.openflowjava.protocol.impl.serialization.SerializerRegistryImpl;
49 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.TransportProtocol;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.MatchField;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.OxmClassBase;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.meter.band.header.meter.band.MeterBandExperimenterCase;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.queue.property.header.QueueProperty;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.table.features.properties.grouping.TableFeatureProperties;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
62 * Exposed class for server handling. <br>
63 * C - {@link MatchEntrySerializerKey} parameter representing oxm_class (see specification)<br>
64 * F - {@link MatchEntrySerializerKey} parameter representing oxm_field (see specification)
66 * @author michal.polkorab
68 public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
70 private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
71 private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
72 private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
74 private SwitchConnectionHandler switchConnectionHandler;
75 private ServerFacade serverFacade;
76 private final ConnectionConfiguration connConfig;
77 private final SerializationFactory serializationFactory;
78 private final SerializerRegistry serializerRegistry;
79 private final DeserializerRegistry deserializerRegistry;
80 private final DeserializationFactory deserializationFactory;
81 private final ListeningExecutorService listeningExecutorService;
82 private final String diagStatusIdentifier;
83 private final String threadName;
84 private TcpConnectionInitializer connectionInitializer;
85 private OpenflowDiagStatusProvider openflowDiagStatusProvider;
87 public SwitchConnectionProviderImpl(
88 @Nullable ConnectionConfiguration connConfig, OpenflowDiagStatusProvider openflowDiagStatusProvider) {
89 this.connConfig = connConfig;
90 String connectionSuffix = createConnectionSuffix(connConfig);
91 this.diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
92 this.openflowDiagStatusProvider = openflowDiagStatusProvider;
93 this.threadName = THREAD_NAME_PREFIX + connectionSuffix;
94 this.listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
95 serializerRegistry = new SerializerRegistryImpl();
96 if (connConfig != null) {
97 serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
99 serializerRegistry.init();
100 serializationFactory = new SerializationFactory(serializerRegistry);
101 deserializerRegistry = new DeserializerRegistryImpl();
102 deserializerRegistry.init();
103 deserializationFactory = new DeserializationFactory(deserializerRegistry);
106 // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
107 private static String createConnectionSuffix(@Nullable ConnectionConfiguration config) {
108 if (config != null) {
109 return "_" + config.getPort();
111 return "-null-config";
116 public void setSwitchConnectionHandler(final SwitchConnectionHandler switchConnectionHandler) {
117 LOG.debug("setSwitchConnectionHandler");
118 this.switchConnectionHandler = switchConnectionHandler;
122 public ListenableFuture<Boolean> shutdown() {
123 LOG.debug("Shutdown summoned");
124 if (serverFacade == null) {
125 LOG.warn("Can not shutdown - not configured or started");
126 throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
128 ListenableFuture<Boolean> serverFacadeShutdownFuture = serverFacade.shutdown();
129 Executors.shutdownAndAwaitTermination(listeningExecutorService);
130 return serverFacadeShutdownFuture;
134 @SuppressWarnings("checkstyle:IllegalCatch")
135 public ListenableFuture<Boolean> startup() {
136 LOG.debug("Startup summoned");
138 serverFacade = createAndConfigureServer();
139 if (switchConnectionHandler == null) {
140 throw new IllegalStateException("SwitchConnectionHandler is not set");
142 Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
145 public void onFailure(Throwable throwable) {
146 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, throwable);
150 public void onSuccess(@Nullable Object nullResult) {
151 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, ServiceState.ERROR,
152 threadName + " terminated");
154 } , MoreExecutors.directExecutor());
155 return serverFacade.getIsOnlineFuture();
156 } catch (RuntimeException e) {
157 return Futures.immediateFailedFuture(e);
161 private ServerFacade createAndConfigureServer() {
162 LOG.debug("Configuring ..");
164 final ChannelInitializerFactory factory = new ChannelInitializerFactory();
165 factory.setSwitchConnectionHandler(switchConnectionHandler);
166 factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
167 factory.setTlsConfig(connConfig.getTlsConfiguration());
168 factory.setSerializationFactory(serializationFactory);
169 factory.setDeserializationFactory(deserializationFactory);
170 factory.setUseBarrier(connConfig.useBarrier());
171 factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
172 final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
173 // Check if Epoll native transport is available.
174 // TODO : Add option to disable Epoll.
175 boolean isEpollEnabled = Epoll.isAvailable();
177 if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
178 server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(), () ->
179 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, ServiceState.OPERATIONAL));
180 final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
181 ((TcpHandler) server).setChannelInitializer(channelInitializer);
182 ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
183 final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup();
184 connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled);
185 connectionInitializer.setChannelInitializer(channelInitializer);
186 connectionInitializer.run();
187 } else if (TransportProtocol.UDP.equals(transportProtocol)) {
188 server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(), () ->
189 openflowDiagStatusProvider.reportStatus(diagStatusIdentifier, ServiceState.OPERATIONAL));
190 ((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
191 ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
193 throw new IllegalStateException("Unknown transport protocol received: " + transportProtocol);
195 server.setThreadConfig(connConfig.getThreadConfiguration());
199 public ServerFacade getServerFacade() {
204 public void close() {
209 public boolean unregisterSerializer(final ExperimenterSerializerKey key) {
210 return serializerRegistry.unregisterSerializer((MessageTypeKey<?>) key);
214 public boolean unregisterDeserializer(final ExperimenterDeserializerKey key) {
215 return deserializerRegistry.unregisterDeserializer((MessageCodeKey) key);
219 public void registerActionSerializer(final ActionSerializerKey<?> key,
220 final OFGeneralSerializer serializer) {
221 serializerRegistry.registerSerializer(key, serializer);
225 public void registerActionDeserializer(final ExperimenterActionDeserializerKey key,
226 final OFGeneralDeserializer deserializer) {
227 deserializerRegistry.registerDeserializer(key, deserializer);
231 public void registerInstructionSerializer(final InstructionSerializerKey<?> key,
232 final OFGeneralSerializer serializer) {
233 serializerRegistry.registerSerializer(key, serializer);
237 public void registerInstructionDeserializer(final ExperimenterInstructionDeserializerKey key,
238 final OFGeneralDeserializer deserializer) {
239 deserializerRegistry.registerDeserializer(key, deserializer);
243 public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(
244 final MatchEntrySerializerKey<C, F> key, final OFGeneralSerializer serializer) {
245 serializerRegistry.registerSerializer(key, serializer);
249 public void registerMatchEntryDeserializer(final MatchEntryDeserializerKey key,
250 final OFGeneralDeserializer deserializer) {
251 deserializerRegistry.registerDeserializer(key, deserializer);
255 public void registerErrorDeserializer(final ExperimenterIdDeserializerKey key,
256 final OFDeserializer<ErrorMessage> deserializer) {
257 deserializerRegistry.registerDeserializer(key, deserializer);
261 public void registerExperimenterMessageDeserializer(ExperimenterIdDeserializerKey key,
262 OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
263 deserializerRegistry.registerDeserializer(key, deserializer);
267 public void registerMultipartReplyMessageDeserializer(ExperimenterIdDeserializerKey key,
268 OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
269 deserializerRegistry.registerDeserializer(key, deserializer);
273 public void registerMultipartReplyTFDeserializer(final ExperimenterIdDeserializerKey key,
274 final OFGeneralDeserializer deserializer) {
275 deserializerRegistry.registerDeserializer(key, deserializer);
279 public void registerQueuePropertyDeserializer(final ExperimenterIdDeserializerKey key,
280 final OFDeserializer<QueueProperty> deserializer) {
281 deserializerRegistry.registerDeserializer(key, deserializer);
285 public void registerMeterBandDeserializer(final ExperimenterIdDeserializerKey key,
286 final OFDeserializer<MeterBandExperimenterCase> deserializer) {
287 deserializerRegistry.registerDeserializer(key, deserializer);
291 public void registerExperimenterMessageSerializer(
292 ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
293 OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
294 serializerRegistry.registerSerializer(key, serializer);
298 public void registerMultipartRequestSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
299 OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
300 serializerRegistry.registerSerializer(key, serializer);
304 public void registerMultipartRequestTFSerializer(final ExperimenterIdSerializerKey<TableFeatureProperties> key,
305 final OFGeneralSerializer serializer) {
306 serializerRegistry.registerSerializer(key, serializer);
312 * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order
313 * to avoid the occurrence of an error, we should discard this function.
317 public void registerMeterBandSerializer(final ExperimenterIdSerializerKey<MeterBandExperimenterCase> key,
318 final OFSerializer<MeterBandExperimenterCase> serializer) {
319 serializerRegistry.registerSerializer(key, serializer);
323 public void registerMeterBandSerializer(
324 final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
325 final OFSerializer<MeterBandExperimenterCase> serializer) {
326 serializerRegistry.registerSerializer(key, serializer);
330 public void initiateConnection(final String host, final int port) {
331 connectionInitializer.initiateConnection(host, port);
335 public ConnectionConfiguration getConfiguration() {
336 return this.connConfig;
340 public <K> void registerSerializer(MessageTypeKey<K> key, OFGeneralSerializer serializer) {
341 serializerRegistry.registerSerializer(key, serializer);
345 public void registerDeserializer(MessageCodeKey key, OFGeneralDeserializer deserializer) {
346 deserializerRegistry.registerDeserializer(key, deserializer);
350 public void registerDeserializerMapping(final TypeToClassKey key, final Class<?> clazz) {
351 deserializationFactory.registerMapping(key, clazz);
355 public boolean unregisterDeserializerMapping(final TypeToClassKey key) {
356 return deserializationFactory.unregisterMapping(key);