58137e51012851421111090c50037ae353802fe9
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import io.netty.util.HashedWheelTimer;
15 import io.netty.util.Timer;
16 import java.lang.management.ManagementFactory;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.concurrent.SynchronousQueue;
22 import java.util.concurrent.ThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.management.InstanceAlreadyExistsException;
28 import javax.management.InstanceNotFoundException;
29 import javax.management.MBeanRegistrationException;
30 import javax.management.MBeanServer;
31 import javax.management.MalformedObjectNameException;
32 import javax.management.NotCompliantMBeanException;
33 import javax.management.ObjectName;
34 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
35 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
37 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
39 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
40 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginConfigurationService;
41 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
42 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
45 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
50 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
51 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
52 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
53 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
55 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
56 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
57 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
58 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
59 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
60 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
61 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
62 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
63 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
64 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
65 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
67 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
68 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
73
74     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
75
76     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
77     private static final long TICK_DURATION = 10;
78     private static final String POOL_NAME = "ofppool";
79
80     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
81     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
82             .format("%s:type=%s",
83                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
84                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
85
86     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
87     private final NotificationPublishService notificationPublishService;
88     private final ExtensionConverterManager extensionConverterManager;
89     private final DataBroker dataBroker;
90     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
91     private final DeviceInitializerProvider deviceInitializerProvider;
92     private final ConvertorManager convertorManager;
93     private final ContextChainHolder contextChainHolder;
94     private final RpcProviderRegistry rpcProviderRegistry;
95     private final ClusterSingletonServiceProvider singletonServicesProvider;
96     private int rpcRequestsQuota;
97     private long globalNotificationQuota;
98     private long barrierInterval;
99     private int barrierCountLimit;
100     private long echoReplyTimeout;
101     private DeviceManager deviceManager;
102     private RpcManager rpcManager;
103     private StatisticsManager statisticsManager;
104     private ConnectionManager connectionManager;
105     private boolean switchFeaturesMandatory;
106     private boolean isStatisticsPollingOn;
107     private boolean isStatisticsRpcEnabled;
108     private boolean isFlowRemovedNotificationOn;
109     private boolean skipTableFeatures;
110     private long basicTimerDelay;
111     private long maximumTimerDelay;
112     private boolean useSingleLayerSerialization;
113     private ThreadPoolExecutor threadPool;
114     private int threadPoolMinThreads;
115     private int threadPoolMaxThreads;
116     private long threadPoolTimeout;
117     private boolean initialized = false;
118
119     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
120         return MESSAGE_INTELLIGENCE_AGENCY;
121     }
122
123     OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
124                                final DataBroker dataBroker,
125                                final RpcProviderRegistry rpcProviderRegistry,
126                                final NotificationPublishService notificationPublishService,
127                                final ClusterSingletonServiceProvider singletonServiceProvider,
128                                final EntityOwnershipService entityOwnershipService) {
129         this.switchConnectionProviders = switchConnectionProviders;
130         this.dataBroker = dataBroker;
131         this.rpcProviderRegistry = rpcProviderRegistry;
132         this.notificationPublishService = notificationPublishService;
133         this.singletonServicesProvider = singletonServiceProvider;
134         convertorManager = ConvertorManagerFactory.createDefaultManager();
135         contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
136         contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
137         extensionConverterManager = new ExtensionConverterManagerImpl();
138         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
139     }
140
141
142     private void startSwitchConnections() {
143         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
144             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
145             if (useSingleLayerSerialization) {
146                 SerializerInjector.injectSerializers(switchConnectionProvider);
147                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
148             } else {
149                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
150             }
151
152             // Set handler of incoming connections and start switch connection provider
153             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
154             return switchConnectionProvider.startup();
155         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
156             @Override
157             public void onSuccess(final List<Boolean> result) {
158                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
159             }
160
161             @Override
162             public void onFailure(@Nonnull final Throwable throwable) {
163                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
164             }
165         });
166     }
167
168     private void shutdownSwitchConnections() {
169         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
170             // Revert deserializers to their original state
171             if (useSingleLayerSerialization) {
172                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
173             }
174
175             // Shutdown switch connection provider
176             return switchConnectionProvider.shutdown();
177         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
178             @Override
179             public void onSuccess(final List<Boolean> result) {
180                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
181             }
182
183             @Override
184             public void onFailure(@Nonnull final Throwable throwable) {
185                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
186             }
187         });
188     }
189
190     @Override
191     public void initialize() {
192         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
193         // TODO: rewrite later!
194         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
195
196         // Creates a thread pool that creates new threads as needed, but will reuse previously
197         // constructed threads when they are available.
198         // Threads that have not been used for x seconds are terminated and removed from the cache.
199         threadPool = new ThreadPoolLoggingExecutor(
200                 Preconditions.checkNotNull(threadPoolMinThreads),
201                 Preconditions.checkNotNull(threadPoolMaxThreads),
202                 Preconditions.checkNotNull(threadPoolTimeout),
203                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
204
205         connectionManager = new ConnectionManagerImpl(threadPool);
206         connectionManager.setEchoReplyTimeout(echoReplyTimeout);
207
208         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
209
210         contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
211
212         deviceManager = new DeviceManagerImpl(
213                 dataBroker,
214                 getMessageIntelligenceAgency(),
215                 notificationPublishService,
216                 hashedWheelTimer,
217                 convertorManager,
218                 deviceInitializerProvider,
219                 useSingleLayerSerialization);
220
221         deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
222         deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
223         deviceManager.setBarrierInterval(barrierInterval);
224         deviceManager.setBarrierCountLimit(barrierCountLimit);
225         deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
226         deviceManager.setSkipTableFeatures(skipTableFeatures);
227
228         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
229
230         rpcManager = new RpcManagerImpl(rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);
231         rpcManager.setRpcRequestQuota(rpcRequestsQuota);
232
233         statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, hashedWheelTimer, convertorManager);
234         statisticsManager.setBasicTimerDelay(basicTimerDelay);
235         statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
236         statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
237
238         // Device connection handler moved from device manager to context holder
239         connectionManager.setDeviceConnectedHandler(contextChainHolder);
240
241         /* Termination Phase ordering - OFP Device Context suite */
242         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
243
244         rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
245
246         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
247         deviceManager.initialize();
248
249         contextChainHolder.addManager(deviceManager);
250         contextChainHolder.addManager(statisticsManager);
251         contextChainHolder.addManager(rpcManager);
252
253         startSwitchConnections();
254         initialized = true;
255     }
256
257
258     @Override
259     public void update(@Nonnull final Map<String, Object> properties) {
260         properties.forEach((key, value) -> {
261             final PropertyType propertyType = PropertyType.forValue(key);
262
263             if (Objects.nonNull(propertyType)) {
264                 updateProperty(propertyType, value);
265             }
266         });
267     }
268
269     private void doPropertyUpdate(final PropertyType propertyType,
270                                   final boolean modifiable,
271                                   final Object origValue,
272                                   final Object newValue,
273                                   final Consumer<Object> successCallback) {
274         if (initialized) {
275             if (Objects.equals(origValue, newValue)) {
276                 LOG.debug("{} config parameter is already set to {})", propertyType, origValue);
277                 return;
278             } else if (!modifiable) {
279                 LOG.warn("{} update ({} -> {}) is not allowed after controller start", propertyType, origValue, newValue);
280                 return;
281             }
282         }
283
284         successCallback.accept(newValue);
285         LOG.info("{} config parameter is updated ({} -> {})", propertyType, origValue, newValue);
286     }
287
288     @Override
289     public void updateProperty(@Nonnull final PropertyType key, @Nonnull final Object value) {
290         try {
291             final String sValue = value.toString();
292             final Consumer<Object> successCallback;
293             final boolean modifiable;
294             final Object oldValue;
295             final Object newValue;
296
297             switch (key) {
298                 case RPC_REQUESTS_QUOTA:
299                     successCallback = (result) -> {
300                         rpcRequestsQuota = (int) result;
301
302                         if (initialized) {
303                             rpcManager.setRpcRequestQuota(rpcRequestsQuota);
304                         }
305                     };
306
307                     oldValue = rpcRequestsQuota;
308                     newValue = Integer.valueOf(sValue);
309                     modifiable = true;
310                     break;
311                 case SWITCH_FEATURES_MANDATORY:
312                     successCallback = (result) -> {
313                         switchFeaturesMandatory = (boolean) result;
314
315                         if (initialized) {
316                             deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
317                         }
318                     };
319
320                     oldValue = switchFeaturesMandatory;
321                     newValue = Boolean.valueOf(sValue);
322                     modifiable = true;
323                     break;
324                 case GLOBAL_NOTIFICATION_QUOTA:
325                     successCallback = (result) -> {
326                         globalNotificationQuota = (long) result;
327
328                         if (initialized) {
329                             deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
330                         }
331                     };
332
333                     oldValue = globalNotificationQuota;
334                     newValue = Long.valueOf(sValue);
335                     modifiable = true;
336                     break;
337                 case IS_STATISTICS_POLLING_ON:
338                     successCallback = (result) -> {
339                         isStatisticsPollingOn = (boolean) result;
340
341                         if (initialized) {
342                             statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
343                         }
344                     };
345
346                     oldValue = isStatisticsPollingOn;
347                     newValue = Boolean.valueOf(sValue);
348                     modifiable = true;
349                     break;
350                 case IS_STATISTICS_RPC_ENABLED:
351                     successCallback = (result) -> {
352                         isStatisticsRpcEnabled = (boolean) result;
353
354                         if (initialized) {
355                             rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
356                         }
357                     };
358
359                     oldValue = isStatisticsRpcEnabled;
360                     newValue = Boolean.valueOf(sValue);
361                     modifiable = true;
362                     break;
363                 case BARRIER_INTERVAL_TIMEOUT_LIMIT:
364                     successCallback = (result) -> {
365                         barrierInterval = (long) result;
366
367                         if (initialized) {
368                             deviceManager.setBarrierInterval(barrierInterval);
369                         }
370                     };
371
372                     oldValue = barrierInterval;
373                     newValue = Long.valueOf(sValue);
374                     modifiable = true;
375                     break;
376                 case BARRIER_COUNT_LIMIT:
377                     successCallback = (result) -> {
378                         barrierCountLimit = (int) result;
379
380                         if (initialized) {
381                             deviceManager.setBarrierCountLimit(barrierCountLimit);
382                         }
383                     };
384
385                     oldValue = barrierCountLimit;
386                     newValue = Integer.valueOf(sValue);
387                     modifiable = true;
388                     break;
389                 case ECHO_REPLY_TIMEOUT:
390                     successCallback = (result) -> {
391                         echoReplyTimeout = (long) result;
392
393                         if (initialized) {
394                             connectionManager.setEchoReplyTimeout(echoReplyTimeout);
395                         }
396                     };
397
398                     oldValue = echoReplyTimeout;
399                     newValue = Long.valueOf(sValue);
400                     modifiable = true;
401                     break;
402                 case THREAD_POOL_MIN_THREADS:
403                     successCallback = (result) -> threadPoolMinThreads = (int) result;
404                     oldValue = threadPoolMinThreads;
405                     newValue = Integer.valueOf(sValue);
406                     modifiable = false;
407                     break;
408                 case THREAD_POOL_MAX_THREADS:
409                     successCallback = (result) -> threadPoolMaxThreads = (int) result;
410                     oldValue = threadPoolMaxThreads;
411                     newValue = Integer.valueOf(sValue);
412                     modifiable = false;
413                     break;
414                 case THREAD_POOL_TIMEOUT:
415                     successCallback = (result) -> threadPoolTimeout = (long) result;
416                     oldValue = threadPoolTimeout;
417                     newValue = Long.valueOf(sValue);
418                     modifiable = false;
419                     break;
420                 case ENABLE_FLOW_REMOVED_NOTIFICATION:
421                     successCallback = (result) -> {
422                         isFlowRemovedNotificationOn = (boolean) result;
423
424                         if (initialized) {
425                             deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
426                         }
427                     };
428
429                     oldValue = isFlowRemovedNotificationOn;
430                     newValue = Boolean.valueOf(sValue);
431                     modifiable = true;
432                     break;
433                 case SKIP_TABLE_FEATURES:
434                     successCallback = (result) -> {
435                         skipTableFeatures = (boolean) result;
436
437                         if (initialized) {
438                             deviceManager.setSkipTableFeatures(skipTableFeatures);
439                         }
440                     };
441
442                     oldValue = skipTableFeatures;
443                     newValue = Boolean.valueOf(sValue);
444                     modifiable = true;
445                     break;
446                 case BASIC_TIMER_DELAY:
447                     successCallback = (result) -> {
448                         basicTimerDelay = (long) result;
449
450                         if (initialized) {
451                             statisticsManager.setBasicTimerDelay(basicTimerDelay);
452                         }
453                     };
454
455                     oldValue = basicTimerDelay;
456                     newValue = Long.valueOf(sValue);
457                     modifiable = true;
458                     break;
459                 case MAXIMUM_TIMER_DELAY:
460                     successCallback = (result) -> {
461                         maximumTimerDelay = (long) result;
462
463                         if (initialized) {
464                             statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
465                         }
466                     };
467
468                     oldValue = maximumTimerDelay;
469                     newValue = Long.valueOf(sValue);
470                     modifiable = true;
471                     break;
472                 case USE_SINGLE_LAYER_SERIALIZATION:
473                     successCallback = (result) -> useSingleLayerSerialization = (boolean) result;
474                     oldValue = useSingleLayerSerialization;
475                     newValue = Boolean.valueOf(sValue);
476                     modifiable = false;
477                     break;
478                 default:
479                     return;
480             }
481
482             doPropertyUpdate(key, modifiable, oldValue, newValue, successCallback);
483         } catch (final Exception ex) {
484             LOG.warn("Failed to read configuration property '{}={}', error: {}", key, value, ex);
485         }
486     }
487
488     @Override
489     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
490         return extensionConverterManager;
491     }
492
493     @Override
494     public void close() throws Exception {
495         initialized = false;
496         gracefulShutdown(contextChainHolder);
497         gracefulShutdown(deviceManager);
498         gracefulShutdown(rpcManager);
499         gracefulShutdown(statisticsManager);
500         gracefulShutdown(threadPool);
501         gracefulShutdown(hashedWheelTimer);
502         shutdownSwitchConnections();
503         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
504     }
505
506     private static void gracefulShutdown(final AutoCloseable closeable) {
507         if (Objects.isNull(closeable)) {
508             return;
509         }
510
511         try {
512             closeable.close();
513         } catch (Exception e) {
514             LOG.warn("Failed to shutdown {} gracefully.", closeable);
515         }
516     }
517
518     private static void gracefulShutdown(final Timer timer) {
519         if (Objects.isNull(timer)) {
520             return;
521         }
522
523         try {
524             timer.stop();
525         } catch (Exception e) {
526             LOG.warn("Failed to shutdown {} gracefully.", timer);
527         }
528     }
529
530     private static void gracefulShutdown(final ThreadPoolExecutor threadPoolExecutor) {
531         if (Objects.isNull(threadPoolExecutor)) {
532             return;
533         }
534
535         try {
536             threadPoolExecutor.shutdown();
537         } catch (Exception e) {
538             LOG.warn("Failed to shutdown {} gracefully.", threadPoolExecutor);
539         }
540     }
541
542     private static void registerMXBean(final Object bean, final String beanName) {
543         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
544
545         try {
546             mbs.registerMBean(bean, new ObjectName(beanName));
547         } catch (MalformedObjectNameException
548                 | NotCompliantMBeanException
549                 | MBeanRegistrationException
550                 | InstanceAlreadyExistsException e) {
551             LOG.warn("Error registering MBean {}", e);
552         }
553     }
554
555     private static void unregisterMXBean(final String beanName) {
556         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
557
558         try {
559             mbs.unregisterMBean(new ObjectName(beanName));
560         } catch (InstanceNotFoundException
561                 | MBeanRegistrationException
562                 | MalformedObjectNameException e) {
563             LOG.warn("Error unregistering MBean {}", e);
564         }
565     }
566 }