import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.HashedWheelTimer;
import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
-import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
-import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
+import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
+import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
+ private static final int TICKS_PER_WHEEL = 500;
+ // 0.5 sec.
+ private static final long TICK_DURATION = 10;
+ private static final Integer DEFAULT_BARRIER_COUNT = 25600;
+ private static final Long DEFAULT_ECHO_TIMEOUT = 2000L;
+ private static final Long DEFAULT_BARRIER_TIMEOUT = 500L;
+
+ private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
private final int rpcRequestsQuota;
private final long globalNotificationQuota;
+ private final ConvertorManager convertorManager;
private long barrierInterval;
private int barrierCountLimit;
private long echoReplyTimeout;
private DeviceManager deviceManager;
- private RoleManager roleManager;
private RpcManager rpcManager;
private RpcProviderRegistry rpcProviderRegistry;
private StatisticsManager statisticsManager;
private ConnectionManager connectionManager;
private NotificationService notificationProviderService;
private NotificationPublishService notificationPublishService;
- private EntityOwnershipService entityOwnershipService;
-
private ExtensionConverterManager extensionConverterManager;
-
private DataBroker dataBroker;
private Collection<SwitchConnectionProvider> switchConnectionProviders;
private boolean switchFeaturesMandatory = false;
- private boolean isStatisticsPollingOff = false;
+ private boolean isStatisticsPollingOn = true;
private boolean isStatisticsRpcEnabled;
+ private boolean isFlowRemovedNotificationOn = true;
+ private boolean skipTableFeatures = true;
- private final LifecycleConductor conductor;
private final ThreadPoolExecutor threadPool;
+ private ClusterSingletonServiceProvider singletonServicesProvider;
public OpenFlowPluginProviderImpl(final long rpcRequestsQuota,
final long globalNotificationQuota,
Preconditions.checkNotNull(threadPoolMaxThreads),
Preconditions.checkNotNull(threadPoolTimeout), TimeUnit.SECONDS,
new SynchronousQueue<>(), "ofppool");
-
- conductor = new LifecycleConductorImpl(messageIntelligenceAgency);
+ convertorManager = ConvertorManagerFactory.createDefaultManager();
}
@Override
- public boolean isStatisticsPollingOff() {
- return isStatisticsPollingOff;
+ public boolean isStatisticsPollingOn() {
+ return isStatisticsPollingOn;
}
@Override
- public void setIsStatisticsPollingOff(final boolean isStatisticsPollingOff) {
- this.isStatisticsPollingOff = isStatisticsPollingOff;
+ public void setStatisticsPollingOn(final boolean isStatisticsPollingOn) {
+ this.isStatisticsPollingOn = isStatisticsPollingOn;
}
private void startSwitchConnections() {
- final List<ListenableFuture<Boolean>> starterChain = new ArrayList<>(switchConnectionProviders.size());
- for (final SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) {
- switchConnectionPrv.setSwitchConnectionHandler(connectionManager);
- final ListenableFuture<Boolean> isOnlineFuture = switchConnectionPrv.startup();
- starterChain.add(isOnlineFuture);
- }
-
- final ListenableFuture<List<Boolean>> srvStarted = Futures.allAsList(starterChain);
- Futures.addCallback(srvStarted, new FutureCallback<List<Boolean>>() {
+ Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
+ // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava
+ SerializerInjector.injectSerializers(switchConnectionProvider);
+ DeserializerInjector.injectDeserializers(switchConnectionProvider);
+
+ // Set handler of incoming connections and start switch connection provider
+ switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
+ return switchConnectionProvider.startup();
+ }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
@Override
public void onSuccess(final List<Boolean> result) {
- LOG.info("All switchConnectionProviders are up and running ({}).",
- result.size());
+ LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
}
@Override
return switchFeaturesMandatory;
}
- @Override
- public void setEntityOwnershipService(final EntityOwnershipService entityOwnershipService) {
- this.entityOwnershipService = entityOwnershipService;
- }
-
@Override
public void setBarrierCountLimit(final int barrierCountLimit) {
this.barrierCountLimit = barrierCountLimit;
this.echoReplyTimeout = echoReplyTimeout;
}
+ @Override
+ public void setFlowRemovedNotification(boolean isFlowRemovedNotificationOn) {
+ this.isFlowRemovedNotificationOn = this.isFlowRemovedNotificationOn;
+ }
+
+ @Override
+ public void setClusteringSingletonServicesProvider(ClusterSingletonServiceProvider singletonServicesProvider) {
+ this.singletonServicesProvider = singletonServicesProvider;
+ }
+
+ @Override
+ public void setSkipTableFeatures(final boolean skipTableFeatures){
+ this.skipTableFeatures = skipTableFeatures;
+ }
@Override
public void setSwitchFeaturesMandatory(final boolean switchFeaturesMandatory) {
Preconditions.checkNotNull(dataBroker, "missing data broker");
Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
+ Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
extensionConverterManager = new ExtensionConverterManagerImpl();
// TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
switchFeaturesMandatory,
barrierInterval,
barrierCountLimit,
- conductor);
- ((ExtensionConverterProviderKeeper) conductor).setExtensionConverterProvider(extensionConverterManager);
- ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
-
- conductor.setSafelyManager(deviceManager);
- conductor.setNotificationPublishService(notificationPublishService);
+ getMessageIntelligenceAgency(),
+ isFlowRemovedNotificationOn,
+ singletonServicesProvider,
+ notificationPublishService,
+ hashedWheelTimer,
+ convertorManager,
+ skipTableFeatures);
- roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, conductor);
- statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff, conductor);
- conductor.setSafelyManager(statisticsManager);
-
- rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, conductor);
- conductor.setSafelyManager(rpcManager);
+ ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
- roleManager.addRoleChangeListener((RoleChangeListener) conductor);
+ rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, extensionConverterManager, convertorManager, notificationPublishService);
+ statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOn, hashedWheelTimer, convertorManager);
/* Initialization Phase ordering - OFP Device Context suite */
// CM -> DM -> SM -> RPC -> Role -> DM
connectionManager.setDeviceConnectedHandler(deviceManager);
deviceManager.setDeviceInitializationPhaseHandler(statisticsManager);
statisticsManager.setDeviceInitializationPhaseHandler(rpcManager);
- rpcManager.setDeviceInitializationPhaseHandler(roleManager);
- roleManager.setDeviceInitializationPhaseHandler(deviceManager);
+ rpcManager.setDeviceInitializationPhaseHandler(deviceManager);
/* Termination Phase ordering - OFP Device Context suite */
deviceManager.setDeviceTerminationPhaseHandler(rpcManager);
rpcManager.setDeviceTerminationPhaseHandler(statisticsManager);
- statisticsManager.setDeviceTerminationPhaseHandler(roleManager);
- roleManager.setDeviceTerminationPhaseHandler(deviceManager);
+ statisticsManager.setDeviceTerminationPhaseHandler(deviceManager);
rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
- TranslatorLibraryUtil.setBasicTranslatorLibrary(deviceManager);
+ TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
deviceManager.initialize();
startSwitchConnections();
}
+ @Override
+ public void update(Map<String,Object> props) {
+ LOG.debug("Update managed properties = {}", props.toString());
+
+ if(deviceManager != null) {
+ if (props.containsKey("notification-flow-removed-off")) {
+ deviceManager.setFlowRemovedNotificationOn(Boolean.valueOf(props.get("enable-flow-removed-notification").toString()));
+ }
+ if (props.containsKey("skip-table-features")) {
+ deviceManager.setSkipTableFeatures(Boolean.valueOf(props.get("skip-table-features").toString()));
+ }
+ if (props.containsKey("barrier-count-limit")) {
+ try {
+ deviceManager.setBarrierCountLimit(Integer.valueOf(props.get("barrier-count-limit").toString()));
+ } catch (NumberFormatException ex) {
+ deviceManager.setBarrierCountLimit(DEFAULT_BARRIER_COUNT);
+ }
+ }
+ if (props.containsKey("barrier-interval-timeout-limit")){
+ try {
+ deviceManager.setBarrierInterval(Long.valueOf(props.get("barrier-interval-timeout-limit").toString()));
+ } catch (NumberFormatException ex) {
+ deviceManager.setBarrierInterval(DEFAULT_BARRIER_TIMEOUT);
+ }
+ }
+ }
+
+ if(rpcManager != null && props.containsKey("is-statistics-rpc-enabled")){
+ rpcManager.setStatisticsRpcEnabled(Boolean.valueOf((props.get("is-statistics-rpc-enabled").toString())));
+ }
+
+ if (connectionManager != null && props.containsKey("echo-reply-timeout") ){
+ try {
+ connectionManager.setEchoReplyTimeout(Long.valueOf(props.get("echo-reply-timeout").toString()));
+ }catch (NumberFormatException ex){
+ connectionManager.setEchoReplyTimeout(DEFAULT_ECHO_TIMEOUT);
+ }
+ }
+
+ if(statisticsManager != null && props.containsKey("is-statistics-polling-on")){
+ statisticsManager.setIsStatisticsPollingOn(Boolean.valueOf(props.get("is-statistics-polling-on").toString()));
+ }
+ }
+
private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
rpcManager.close();
statisticsManager.close();
- // TODO: needs to close org.opendaylight.openflowplugin.impl.role.OpenflowOwnershipListener after RoleContexts are down
- // TODO: must not be executed prior to all living RoleContexts have been closed (via closing living DeviceContexts)
- roleManager.close();
-
// Manually shutdown all remaining running threads in pool
threadPool.shutdown();
}
-}
+}
\ No newline at end of file