Add timeout for device initialization
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import io.netty.util.HashedWheelTimer;
15 import java.lang.management.ManagementFactory;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Objects;
20 import java.util.concurrent.SynchronousQueue;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.Consumer;
24 import java.util.stream.Collectors;
25 import javax.annotation.Nonnull;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
36 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
39 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginConfigurationService;
40 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
44 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
45 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
49 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
50 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
51 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
52 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
53 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
55 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
56 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
57 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
58 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
59 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
60 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
61 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
62 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
63 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
64 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
65 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
67 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
72
73     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
74     private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
75     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
76     private static final long TICK_DURATION = 10;
77     private static final String POOL_NAME = "ofppool";
78
79     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
80     private final NotificationService notificationProviderService;
81     private final NotificationPublishService notificationPublishService;
82     private final ExtensionConverterManager extensionConverterManager;
83     private final DataBroker dataBroker;
84     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
85     private final DeviceInitializerProvider deviceInitializerProvider;
86     private final ConvertorManager convertorManager;
87     private final ContextChainHolder contextChainHolder;
88     private int rpcRequestsQuota;
89     private long globalNotificationQuota;
90     private long barrierInterval;
91     private int barrierCountLimit;
92     private long echoReplyTimeout;
93     private DeviceManager deviceManager;
94     private RpcManager rpcManager;
95     private RpcProviderRegistry rpcProviderRegistry;
96     private StatisticsManager statisticsManager;
97     private ConnectionManager connectionManager;
98     private boolean switchFeaturesMandatory;
99     private boolean isStatisticsPollingOn;
100     private boolean isStatisticsRpcEnabled;
101     private boolean isFlowRemovedNotificationOn;
102     private boolean skipTableFeatures;
103     private long basicTimerDelay;
104     private long maximumTimerDelay;
105     private boolean useSingleLayerSerialization;
106     private ThreadPoolExecutor threadPool;
107     private ClusterSingletonServiceProvider singletonServicesProvider;
108     private int threadPoolMinThreads;
109     private int threadPoolMaxThreads;
110     private long threadPoolTimeout;
111     private boolean initialized = false;
112
113     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
114         return messageIntelligenceAgency;
115     }
116
117     public OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
118                                       final DataBroker dataBroker,
119                                       final RpcProviderRegistry rpcProviderRegistry,
120                                       final NotificationService notificationProviderService,
121                                       final NotificationPublishService notificationPublishService,
122                                       final ClusterSingletonServiceProvider singletonServiceProvider,
123                                       final EntityOwnershipService entityOwnershipService) {
124         this.switchConnectionProviders = switchConnectionProviders;
125         this.dataBroker = dataBroker;
126         this.rpcProviderRegistry = rpcProviderRegistry;
127         this.notificationProviderService = notificationProviderService;
128         this.notificationPublishService = notificationPublishService;
129         this.singletonServicesProvider = singletonServiceProvider;
130         convertorManager = ConvertorManagerFactory.createDefaultManager();
131         contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
132         contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
133         extensionConverterManager = new ExtensionConverterManagerImpl();
134         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
135     }
136
137
138     private void startSwitchConnections() {
139         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
140             // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava
141             if (useSingleLayerSerialization) {
142                 SerializerInjector.injectSerializers(switchConnectionProvider);
143                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
144             } else {
145                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
146             }
147
148             // Set handler of incoming connections and start switch connection provider
149             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
150             return switchConnectionProvider.startup();
151         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
152             @Override
153             public void onSuccess(final List<Boolean> result) {
154                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
155             }
156
157             @Override
158             public void onFailure(@Nonnull final Throwable t) {
159                 LOG.warn("Some switchConnectionProviders failed to start.", t);
160             }
161         });
162     }
163
164     @Override
165     public void initialize() {
166         Preconditions.checkNotNull(dataBroker, "missing data broker");
167         Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
168         Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
169         Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
170
171         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
172         // TODO: rewrite later!
173         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
174
175         // Creates a thread pool that creates new threads as needed, but will reuse previously
176         // constructed threads when they are available.
177         // Threads that have not been used for x seconds are terminated and removed from the cache.
178         threadPool = new ThreadPoolLoggingExecutor(
179                 Preconditions.checkNotNull(threadPoolMinThreads),
180                 Preconditions.checkNotNull(threadPoolMaxThreads),
181                 Preconditions.checkNotNull(threadPoolTimeout),
182                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
183
184         connectionManager = new ConnectionManagerImpl(threadPool);
185         connectionManager.setEchoReplyTimeout(echoReplyTimeout);
186
187         registerMXBean(messageIntelligenceAgency);
188
189         contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
190
191         deviceManager = new DeviceManagerImpl(
192                 dataBroker,
193                 getMessageIntelligenceAgency(),
194                 notificationPublishService,
195                 hashedWheelTimer,
196                 convertorManager,
197                 deviceInitializerProvider,
198                 useSingleLayerSerialization);
199
200         deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
201         deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
202         deviceManager.setBarrierInterval(barrierInterval);
203         deviceManager.setBarrierCountLimit(barrierCountLimit);
204         deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
205         deviceManager.setSkipTableFeatures(skipTableFeatures);
206
207         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
208
209         rpcManager = new RpcManagerImpl(rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);
210         rpcManager.setRpcRequestQuota(rpcRequestsQuota);
211
212         statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, hashedWheelTimer, convertorManager);
213         statisticsManager.setBasicTimerDelay(basicTimerDelay);
214         statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
215         statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
216
217         // Device connection handler moved from device manager to context holder
218         connectionManager.setDeviceConnectedHandler(contextChainHolder);
219
220         /* Termination Phase ordering - OFP Device Context suite */
221         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
222
223         rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
224
225         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
226         deviceManager.initialize();
227
228         contextChainHolder.addManager(deviceManager);
229         contextChainHolder.addManager(statisticsManager);
230         contextChainHolder.addManager(rpcManager);
231
232         startSwitchConnections();
233         initialized = true;
234     }
235
236
237     @Override
238     public void update(@Nonnull final Map<String, Object> properties) {
239         properties.forEach((key, value) -> {
240             final PropertyType propertyType = PropertyType.forValue(key);
241
242             if (Objects.nonNull(propertyType)) {
243                 updateProperty(propertyType, value);
244             } else if (!key.equals("service.pid") && !key.equals("felix.fileinstall.filename")) {
245                 LOG.warn("Unsupported configuration property '{}={}'", key, value);
246             }
247         });
248     }
249
250     private void doPropertyUpdate(final PropertyType propertyType,
251                                   final boolean modifiable,
252                                   final Object origValue,
253                                   final Object newValue,
254                                   final Consumer<Object> successCallback) {
255         if (initialized) {
256             if (Objects.equals(origValue, newValue)) {
257                 LOG.debug("{} config parameter is already set to {})", propertyType, origValue);
258                 return;
259             } else if (!modifiable) {
260                 LOG.warn("{} update ({} -> {}) is not allowed after controller start", propertyType, origValue, newValue);
261                 return;
262             }
263         }
264
265         successCallback.accept(newValue);
266         LOG.info("{} config parameter is updated ({} -> {})", propertyType, origValue, newValue);
267     }
268
269     @Override
270     public void updateProperty(@Nonnull final PropertyType key, @Nonnull final Object value) {
271         try {
272             final String sValue = value.toString();
273             final Consumer<Object> successCallback;
274             final boolean modifiable;
275             final Object oldValue;
276             final Object newValue;
277
278             switch (key) {
279                 case RPC_REQUESTS_QUOTA:
280                     successCallback = (result) -> {
281                         rpcRequestsQuota = (int) result;
282
283                         if (initialized) {
284                             rpcManager.setRpcRequestQuota(rpcRequestsQuota);
285                         }
286                     };
287
288                     oldValue = rpcRequestsQuota;
289                     newValue = Integer.valueOf(sValue);
290                     modifiable = true;
291                     break;
292                 case SWITCH_FEATURES_MANDATORY:
293                     successCallback = (result) -> {
294                         switchFeaturesMandatory = (boolean) result;
295
296                         if (initialized) {
297                             deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
298                         }
299                     };
300
301                     oldValue = switchFeaturesMandatory;
302                     newValue = Boolean.valueOf(sValue);
303                     modifiable = true;
304                     break;
305                 case GLOBAL_NOTIFICATION_QUOTA:
306                     successCallback = (result) -> {
307                         globalNotificationQuota = (long) result;
308
309                         if (initialized) {
310                             deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
311                         }
312                     };
313
314                     oldValue = globalNotificationQuota;
315                     newValue = Long.valueOf(sValue);
316                     modifiable = true;
317                     break;
318                 case IS_STATISTICS_POLLING_ON:
319                     successCallback = (result) -> {
320                         isStatisticsPollingOn = (boolean) result;
321
322                         if (initialized) {
323                             statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
324                         }
325                     };
326
327                     oldValue = isStatisticsPollingOn;
328                     newValue = Boolean.valueOf(sValue);
329                     modifiable = true;
330                     break;
331                 case IS_STATISTICS_RPC_ENABLED:
332                     successCallback = (result) -> {
333                         isStatisticsRpcEnabled = (boolean) result;
334
335                         if (initialized) {
336                             rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
337                         }
338                     };
339
340                     oldValue = isStatisticsRpcEnabled;
341                     newValue = Boolean.valueOf(sValue);
342                     modifiable = true;
343                     break;
344                 case BARRIER_INTERVAL_TIMEOUT_LIMIT:
345                     successCallback = (result) -> {
346                         barrierInterval = (long) result;
347
348                         if (initialized) {
349                             deviceManager.setBarrierInterval(barrierInterval);
350                         }
351                     };
352
353                     oldValue = barrierInterval;
354                     newValue = Long.valueOf(sValue);
355                     modifiable = true;
356                     break;
357                 case BARRIER_COUNT_LIMIT:
358                     successCallback = (result) -> {
359                         barrierCountLimit = (int) result;
360
361                         if (initialized) {
362                             deviceManager.setBarrierCountLimit(barrierCountLimit);
363                         }
364                     };
365
366                     oldValue = barrierCountLimit;
367                     newValue = Integer.valueOf(sValue);
368                     modifiable = true;
369                     break;
370                 case ECHO_REPLY_TIMEOUT:
371                     successCallback = (result) -> {
372                         echoReplyTimeout = (long) result;
373
374                         if (initialized) {
375                             connectionManager.setEchoReplyTimeout(echoReplyTimeout);
376                         }
377                     };
378
379                     oldValue = echoReplyTimeout;
380                     newValue = Long.valueOf(sValue);
381                     modifiable = true;
382                     break;
383                 case THREAD_POOL_MIN_THREADS:
384                     successCallback = (result) -> threadPoolMinThreads = (int) result;
385                     oldValue = threadPoolMinThreads;
386                     newValue = Integer.valueOf(sValue);
387                     modifiable = false;
388                     break;
389                 case THREAD_POOL_MAX_THREADS:
390                     successCallback = (result) -> threadPoolMaxThreads = (int) result;
391                     oldValue = threadPoolMaxThreads;
392                     newValue = Integer.valueOf(sValue);
393                     modifiable = false;
394                     break;
395                 case THREAD_POOL_TIMEOUT:
396                     successCallback = (result) -> threadPoolTimeout = (long) result;
397                     oldValue = threadPoolTimeout;
398                     newValue = Long.valueOf(sValue);
399                     modifiable = false;
400                     break;
401                 case ENABLE_FLOW_REMOVED_NOTIFICATION:
402                     successCallback = (result) -> {
403                         isFlowRemovedNotificationOn = (boolean) result;
404
405                         if (initialized) {
406                             deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
407                         }
408                     };
409
410                     oldValue = isFlowRemovedNotificationOn;
411                     newValue = Boolean.valueOf(sValue);
412                     modifiable = true;
413                     break;
414                 case SKIP_TABLE_FEATURES:
415                     successCallback = (result) -> {
416                         skipTableFeatures = (boolean) result;
417
418                         if (initialized) {
419                             deviceManager.setSkipTableFeatures(skipTableFeatures);
420                         }
421                     };
422
423                     oldValue = skipTableFeatures;
424                     newValue = Boolean.valueOf(sValue);
425                     modifiable = true;
426                     break;
427                 case BASIC_TIMER_DELAY:
428                     successCallback = (result) -> {
429                         basicTimerDelay = (long) result;
430
431                         if (initialized) {
432                             statisticsManager.setBasicTimerDelay(basicTimerDelay);
433                         }
434                     };
435
436                     oldValue = basicTimerDelay;
437                     newValue = Long.valueOf(sValue);
438                     modifiable = true;
439                     break;
440                 case MAXIMUM_TIMER_DELAY:
441                     successCallback = (result) -> {
442                         maximumTimerDelay = (long) result;
443
444                         if (initialized) {
445                             statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
446                         }
447                     };
448
449                     oldValue = maximumTimerDelay;
450                     newValue = Long.valueOf(sValue);
451                     modifiable = true;
452                     break;
453                 case USE_SINGLE_LAYER_SERIALIZATION:
454                     successCallback = (result) -> useSingleLayerSerialization = (boolean) result;
455                     oldValue = useSingleLayerSerialization;
456                     newValue = Boolean.valueOf(sValue);
457                     modifiable = false;
458                     break;
459                 default:
460                     LOG.warn("Unsupported configuration property '{}={}'", key, sValue);
461                     return;
462             }
463
464             doPropertyUpdate(key, modifiable, oldValue, newValue, successCallback);
465         } catch (final Exception ex) {
466             LOG.warn("Failed to read configuration property '{}={}', error: {}", key, value, ex);
467         }
468     }
469
470     @Override
471     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
472         return extensionConverterManager;
473     }
474
475     @Override
476     public void close() throws Exception {
477         initialized = false;
478         //TODO: consider wrapping each manager into try-catch
479         deviceManager.close();
480         rpcManager.close();
481         statisticsManager.close();
482
483         // Manually shutdown all remaining running threads in pool
484         threadPool.shutdown();
485     }
486
487     private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
488         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
489         try {
490             final String pathToMxBean = String.format("%s:type=%s",
491                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
492                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
493             final ObjectName name = new ObjectName(pathToMxBean);
494             mbs.registerMBean(messageIntelligenceAgency, name);
495         } catch (MalformedObjectNameException
496                 | NotCompliantMBeanException
497                 | MBeanRegistrationException
498                 | InstanceAlreadyExistsException e) {
499             LOG.warn("Error registering MBean {}", e);
500         }
501     }
502 }