Improve property-based configuration
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import io.netty.util.HashedWheelTimer;
15 import java.lang.management.ManagementFactory;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Objects;
20 import java.util.concurrent.SynchronousQueue;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.Consumer;
24 import java.util.stream.Collectors;
25 import javax.annotation.Nonnull;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
36 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
39 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginConfigurationService;
40 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
44 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
45 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
49 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
50 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
51 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
52 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
53 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
55 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
56 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
57 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
58 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
59 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
60 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
61 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
62 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
63 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
64 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
65 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
67 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
72
73     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
74     private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
75     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
76     private static final long TICK_DURATION = 10;
77     private static final String POOL_NAME = "ofppool";
78
79     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
80     private final NotificationService notificationProviderService;
81     private final NotificationPublishService notificationPublishService;
82     private final ExtensionConverterManager extensionConverterManager;
83     private final DataBroker dataBroker;
84     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
85     private final DeviceInitializerProvider deviceInitializerProvider;
86     private final ConvertorManager convertorManager;
87     private final ContextChainHolder contextChainHolder;private int rpcRequestsQuota;
88     private long globalNotificationQuota;
89     private long barrierInterval;
90     private int barrierCountLimit;
91     private long echoReplyTimeout;
92     private DeviceManager deviceManager;
93     private RpcManager rpcManager;
94     private RpcProviderRegistry rpcProviderRegistry;
95     private StatisticsManager statisticsManager;
96     private ConnectionManager connectionManager;
97     private boolean switchFeaturesMandatory;
98     private boolean isStatisticsPollingOn;
99     private boolean isStatisticsRpcEnabled;
100     private boolean isFlowRemovedNotificationOn;
101     private boolean skipTableFeatures;
102     private long basicTimerDelay;
103     private long maximumTimerDelay;
104     private boolean useSingleLayerSerialization;
105     private ThreadPoolExecutor threadPool;
106     private ClusterSingletonServiceProvider singletonServicesProvider;
107     private int threadPoolMinThreads;
108     private int threadPoolMaxThreads;
109     private long threadPoolTimeout;
110     private boolean initialized = false;
111
112     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
113         return messageIntelligenceAgency;
114     }
115
116     public OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
117                                       final DataBroker dataBroker,
118                                       final RpcProviderRegistry rpcProviderRegistry,
119                                       final NotificationService notificationProviderService,
120                                       final NotificationPublishService notificationPublishService,
121                                       final ClusterSingletonServiceProvider singletonServiceProvider,
122                                       final EntityOwnershipService entityOwnershipService) {
123         this.switchConnectionProviders = switchConnectionProviders;
124         this.dataBroker = dataBroker;
125         this.rpcProviderRegistry = rpcProviderRegistry;
126         this.notificationProviderService = notificationProviderService;
127         this.notificationPublishService = notificationPublishService;
128         this.singletonServicesProvider = singletonServiceProvider;
129         convertorManager = ConvertorManagerFactory.createDefaultManager();
130         contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
131         contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
132         extensionConverterManager = new ExtensionConverterManagerImpl();
133         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
134     }
135
136
137     private void startSwitchConnections() {
138         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
139             // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava
140             if (useSingleLayerSerialization) {
141                 SerializerInjector.injectSerializers(switchConnectionProvider);
142                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
143             }
144
145             // Set handler of incoming connections and start switch connection provider
146             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
147             return switchConnectionProvider.startup();
148         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
149             @Override
150             public void onSuccess(final List<Boolean> result) {
151                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
152             }
153
154             @Override
155             public void onFailure(@Nonnull final Throwable t) {
156                 LOG.warn("Some switchConnectionProviders failed to start.", t);
157             }
158         });
159     }
160
161     @Override
162     public void initialize() {
163         Preconditions.checkNotNull(dataBroker, "missing data broker");
164         Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
165         Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
166         Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
167
168         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
169         // TODO: rewrite later!
170         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
171
172         // Creates a thread pool that creates new threads as needed, but will reuse previously
173         // constructed threads when they are available.
174         // Threads that have not been used for x seconds are terminated and removed from the cache.
175         threadPool = new ThreadPoolLoggingExecutor(
176                 Preconditions.checkNotNull(threadPoolMinThreads),
177                 Preconditions.checkNotNull(threadPoolMaxThreads),
178                 Preconditions.checkNotNull(threadPoolTimeout),
179                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
180
181         connectionManager = new ConnectionManagerImpl(threadPool);
182         connectionManager.setEchoReplyTimeout(echoReplyTimeout);
183
184         registerMXBean(messageIntelligenceAgency);
185
186         contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
187
188         deviceManager = new DeviceManagerImpl(
189                 dataBroker,
190                 getMessageIntelligenceAgency(),
191                 notificationPublishService,
192                 hashedWheelTimer,
193                 convertorManager,
194                 deviceInitializerProvider);
195
196         deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
197         deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
198         deviceManager.setBarrierInterval(barrierInterval);
199         deviceManager.setBarrierCountLimit(barrierCountLimit);
200         deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
201         deviceManager.setSkipTableFeatures(skipTableFeatures);
202         deviceManager.setUseSingleLayerSerialization(useSingleLayerSerialization);
203
204         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
205
206         rpcManager = new RpcManagerImpl(rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);
207         rpcManager.setRpcRequestQuota(rpcRequestsQuota);
208
209         statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, hashedWheelTimer, convertorManager);
210         statisticsManager.setBasicTimerDelay(basicTimerDelay);
211         statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
212         statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
213
214         // Device connection handler moved from device manager to context holder
215         connectionManager.setDeviceConnectedHandler(contextChainHolder);
216
217         /* Termination Phase ordering - OFP Device Context suite */
218         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
219
220         rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
221
222         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
223         deviceManager.initialize();
224
225         contextChainHolder.addManager(deviceManager);
226         contextChainHolder.addManager(statisticsManager);
227         contextChainHolder.addManager(rpcManager);
228
229         startSwitchConnections();
230         initialized = true;
231     }
232
233
234     @Override
235     public void update(@Nonnull final Map<String, Object> properties) {
236         properties.forEach((key, value) -> {
237             final PropertyType propertyType = PropertyType.forValue(key);
238
239             if (Objects.nonNull(propertyType)) {
240                 updateProperty(propertyType, value);
241             } else if (!key.equals("service.pid") && !key.equals("felix.fileinstall.filename")) {
242                 LOG.warn("Unsupported configuration property '{}={}'", key, value);
243             }
244         });
245     }
246
247     private void doPropertyUpdate(final PropertyType propertyType,
248                                   final boolean modifiable,
249                                   final Object origValue,
250                                   final Object newValue,
251                                   final Consumer<Object> successCallback) {
252         if (initialized) {
253             if (Objects.equals(origValue, newValue)) {
254                 LOG.debug("{} config parameter is already set to {})", propertyType, origValue);
255                 return;
256             } else if (!modifiable) {
257                 LOG.warn("{} update ({} -> {}) is not allowed after controller start", propertyType, origValue, newValue);
258                 return;
259             }
260         }
261
262         successCallback.accept(newValue);
263         LOG.info("{} config parameter is updated ({} -> {})", propertyType, origValue, newValue);
264     }
265
266     @Override
267     public void updateProperty(@Nonnull final PropertyType key, @Nonnull final Object value) {
268         try {
269             final String sValue = value.toString();
270             final Consumer<Object> successCallback;
271             final boolean modifiable;
272             final Object oldValue;
273             final Object newValue;
274
275             switch (key) {
276                 case RPC_REQUESTS_QUOTA:
277                     successCallback = (result) -> {
278                         rpcRequestsQuota = (int) result;
279
280                         if (initialized) {
281                             rpcManager.setRpcRequestQuota(rpcRequestsQuota);
282                         }
283                     };
284
285                     oldValue = rpcRequestsQuota;
286                     newValue = Integer.valueOf(sValue);
287                     modifiable = true;
288                     break;
289                 case SWITCH_FEATURES_MANDATORY:
290                     successCallback = (result) -> {
291                         switchFeaturesMandatory = (boolean) result;
292
293                         if (initialized) {
294                             deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
295                         }
296                     };
297
298                     oldValue = switchFeaturesMandatory;
299                     newValue = Boolean.valueOf(sValue);
300                     modifiable = true;
301                     break;
302                 case GLOBAL_NOTIFICATION_QUOTA:
303                     successCallback = (result) -> {
304                         globalNotificationQuota = (long) result;
305
306                         if (initialized) {
307                             deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
308                         }
309                     };
310
311                     oldValue = globalNotificationQuota;
312                     newValue = Long.valueOf(sValue);
313                     modifiable = true;
314                     break;
315                 case IS_STATISTICS_POLLING_ON:
316                     successCallback = (result) -> {
317                         isStatisticsPollingOn = (boolean) result;
318
319                         if (initialized) {
320                             statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
321                         }
322                     };
323
324                     oldValue = isStatisticsPollingOn;
325                     newValue = Boolean.valueOf(sValue);
326                     modifiable = true;
327                     break;
328                 case IS_STATISTICS_RPC_ENABLED:
329                     successCallback = (result) -> {
330                         isStatisticsRpcEnabled = (boolean) result;
331
332                         if (initialized) {
333                             rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
334                         }
335                     };
336
337                     oldValue = isStatisticsRpcEnabled;
338                     newValue = Boolean.valueOf(sValue);
339                     modifiable = true;
340                     break;
341                 case BARRIER_INTERVAL_TIMEOUT_LIMIT:
342                     successCallback = (result) -> {
343                         barrierInterval = (long) result;
344
345                         if (initialized) {
346                             deviceManager.setBarrierInterval(barrierInterval);
347                         }
348                     };
349
350                     oldValue = barrierInterval;
351                     newValue = Long.valueOf(sValue);
352                     modifiable = true;
353                     break;
354                 case BARRIER_COUNT_LIMIT:
355                     successCallback = (result) -> {
356                         barrierCountLimit = (int) result;
357
358                         if (initialized) {
359                             deviceManager.setBarrierCountLimit(barrierCountLimit);
360                         }
361                     };
362
363                     oldValue = barrierCountLimit;
364                     newValue = Integer.valueOf(sValue);
365                     modifiable = true;
366                     break;
367                 case ECHO_REPLY_TIMEOUT:
368                     successCallback = (result) -> {
369                         echoReplyTimeout = (long) result;
370
371                         if (initialized) {
372                             connectionManager.setEchoReplyTimeout(echoReplyTimeout);
373                         }
374                     };
375
376                     oldValue = echoReplyTimeout;
377                     newValue = Long.valueOf(sValue);
378                     modifiable = true;
379                     break;
380                 case THREAD_POOL_MIN_THREADS:
381                     successCallback = (result) -> threadPoolMinThreads = (int) result;
382                     oldValue = threadPoolMinThreads;
383                     newValue = Integer.valueOf(sValue);
384                     modifiable = false;
385                     break;
386                 case THREAD_POOL_MAX_THREADS:
387                     successCallback = (result) -> threadPoolMaxThreads = (int) result;
388                     oldValue = threadPoolMaxThreads;
389                     newValue = Integer.valueOf(sValue);
390                     modifiable = false;
391                     break;
392                 case THREAD_POOL_TIMEOUT:
393                     successCallback = (result) -> threadPoolTimeout = (long) result;
394                     oldValue = threadPoolTimeout;
395                     newValue = Long.valueOf(sValue);
396                     modifiable = false;
397                     break;
398                 case ENABLE_FLOW_REMOVED_NOTIFICATION:
399                     successCallback = (result) -> {
400                         isFlowRemovedNotificationOn = (boolean) result;
401
402                         if (initialized) {
403                             deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
404                         }
405                     };
406
407                     oldValue = isFlowRemovedNotificationOn;
408                     newValue = Boolean.valueOf(sValue);
409                     modifiable = true;
410                     break;
411                 case SKIP_TABLE_FEATURES:
412                     successCallback = (result) -> {
413                         skipTableFeatures = (boolean) result;
414
415                         if (initialized) {
416                             deviceManager.setSkipTableFeatures(skipTableFeatures);
417                         }
418                     };
419
420                     oldValue = skipTableFeatures;
421                     newValue = Boolean.valueOf(sValue);
422                     modifiable = true;
423                     break;
424                 case BASIC_TIMER_DELAY:
425                     successCallback = (result) -> {
426                         basicTimerDelay = (long) result;
427
428                         if (initialized) {
429                             statisticsManager.setBasicTimerDelay(basicTimerDelay);
430                         }
431                     };
432
433                     oldValue = basicTimerDelay;
434                     newValue = Long.valueOf(sValue);
435                     modifiable = true;
436                     break;
437                 case MAXIMUM_TIMER_DELAY:
438                     successCallback = (result) -> {
439                         maximumTimerDelay = (long) result;
440
441                         if (initialized) {
442                             statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
443                         }
444                     };
445
446                     oldValue = maximumTimerDelay;
447                     newValue = Long.valueOf(sValue);
448                     modifiable = true;
449                     break;
450                 case USE_SINGLE_LAYER_SERIALIZATION:
451                     successCallback = (result) -> {
452                         useSingleLayerSerialization = (boolean) result;
453
454                         switchConnectionProviders.forEach(switchConnectionProvider -> {
455                             if (useSingleLayerSerialization) {
456                                 SerializerInjector.injectSerializers(switchConnectionProvider);
457                                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
458                             } else {
459                                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
460                             }
461                         });
462                     };
463
464                     oldValue = useSingleLayerSerialization;
465                     newValue = Boolean.valueOf(sValue);
466                     modifiable = true;
467                     break;
468                 default:
469                     LOG.warn("Unsupported configuration property '{}={}'", key, sValue);
470                     return;
471             }
472
473             doPropertyUpdate(key, modifiable, oldValue, newValue, successCallback);
474         } catch (final Exception ex) {
475             LOG.warn("Failed to read configuration property '{}={}', error: {}", key, value, ex);
476         }
477     }
478
479     @Override
480     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
481         return extensionConverterManager;
482     }
483
484     @Override
485     public void close() throws Exception {
486         initialized = false;
487         //TODO: consider wrapping each manager into try-catch
488         deviceManager.close();
489         rpcManager.close();
490         statisticsManager.close();
491
492         // Manually shutdown all remaining running threads in pool
493         threadPool.shutdown();
494     }
495
496     private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
497         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
498         try {
499             final String pathToMxBean = String.format("%s:type=%s",
500                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
501                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
502             final ObjectName name = new ObjectName(pathToMxBean);
503             mbs.registerMBean(messageIntelligenceAgency, name);
504         } catch (MalformedObjectNameException
505                 | NotCompliantMBeanException
506                 | MBeanRegistrationException
507                 | InstanceAlreadyExistsException e) {
508             LOG.warn("Error registering MBean {}", e);
509         }
510     }
511 }