X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2FLifecycleConductorImpl.java;h=32b2d768faae2a0db2d2dca441c691cc8a7086d0;hb=d8cc382a8dbdcb7c89b68457b3ae0b6d576ee28f;hp=942f1527fee6e2f37e22bc896cbdafaedb04d510;hpb=3472444ed871f0172d179a048bd7815ef02dd3ab;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java index 942f1527fe..32b2d768fa 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -16,12 +17,14 @@ import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; - +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.openflowplugin.api.openflow.OFPManager; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; @@ -31,46 +34,75 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChang import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener; +import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; +import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; +import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; +import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** */ -final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener { +final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(LifecycleConductorImpl.class); private static final int TICKS_PER_WHEEL = 500; private static final long TICK_DURATION = 10; // 0.5 sec. private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); + private ExtensionConverterProvider extensionConverterProvider; private DeviceManager deviceManager; private StatisticsManager statisticsManager; private RpcManager rpcManager; private final MessageIntelligenceAgency messageIntelligenceAgency; + private final ConvertorExecutor convertorExecutor; private ConcurrentHashMap serviceChangeListeners = new ConcurrentHashMap<>(); + private NotificationPublishService notificationPublishService; + + LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency, ConvertorExecutor convertorExecutor) { + this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency); + this.convertorExecutor = convertorExecutor; + } - LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency) { - Preconditions.checkNotNull(messageIntelligenceAgency); - this.messageIntelligenceAgency = messageIntelligenceAgency; + @Override + public ExtensionConverterProvider getExtensionConverterProvider() { + return extensionConverterProvider; + } + + @Override + public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) { + this.extensionConverterProvider = extensionConverterProvider; } @Override public void setSafelyManager(final OFPManager manager){ - if (manager == null) { - LOG.info("Manager {} is already defined in conductor. ", manager); - } if (manager instanceof RpcManager) { + if (rpcManager != null) { + LOG.info("RPC manager {} is already defined in conductor. ", manager); + return; + } this.rpcManager = (RpcManager) manager; } else { if (manager instanceof StatisticsManager) { + if (statisticsManager != null) { + LOG.info("Statistics manager {} is already defined in conductor. ", manager); + return; + } this.statisticsManager = (StatisticsManager) manager; } else { if (manager instanceof DeviceManager) { + if (deviceManager != null) { + LOG.info("Device manager {} is already defined in conductor. ", manager); + return; + } this.deviceManager = (DeviceManager) manager; } } @@ -91,7 +123,7 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size()); for (final Map.Entry nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) { if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) { - LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo, success); + LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo.getNodeId().getValue(), success); nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success); serviceChangeListeners.remove(deviceInfo); } @@ -101,10 +133,10 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList @Override public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) { if (!success) { - LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo); + LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue()); closeConnection(deviceInfo); } else { - LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo); + LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo.getNodeId().getValue()); } } @@ -117,57 +149,79 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList } @Override - public void roleChangeOnDevice(final DeviceInfo deviceInfo, final boolean success, final OfpRole newRole, final boolean initializationPhase) { + public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) { - final DeviceContext deviceContext = getDeviceContext(deviceInfo); + final DeviceContext deviceContext = Preconditions.checkNotNull( + deviceManager.gainContext(deviceInfo), + "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue() + ); - if (null == deviceContext) { - LOG.warn("Something went wrong, device context for nodeId: {} doesn't exists"); - return; - } - if (!success) { - LOG.warn("Role change to {} in role context for node {} was NOT successful, closing connection", newRole, deviceInfo); - closeConnection(deviceInfo); - } else { - if (initializationPhase) { - LOG.debug("Initialization phase skipping starting services."); - return; - } + final RpcContext rpcContext = Preconditions.checkNotNull( + rpcManager.gainContext(deviceInfo), + "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue() + ); - LOG.info("Role change to {} in role context for node {} was successful, starting/stopping services.", newRole, deviceInfo); + LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo.getNodeId().getValue()); - final String logText; + if (OfpRole.BECOMEMASTER.equals(newRole)) { + fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry()); + MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider, convertorExecutor); - if (OfpRole.BECOMEMASTER.equals(newRole)) { - logText = "Start"; - statisticsManager.startScheduling(deviceInfo); - MdSalRegistrationUtils.registerMasterServices( - rpcManager.gainContext(deviceInfo), + if (rpcContext.isStatisticsRpcEnabled()) { + MdSalRegistrationUtils.registerStatCompatibilityServices( + rpcContext, deviceContext, - OfpRole.BECOMEMASTER); - } else { - logText = "Stopp"; - statisticsManager.stopScheduling(deviceInfo); - MdSalRegistrationUtils.registerSlaveServices( - rpcManager.gainContext(deviceInfo), - OfpRole.BECOMESLAVE); + notificationPublishService, convertorExecutor); } + } else { + statisticsManager.stopScheduling(deviceInfo); + + // Clean device flow registry if we became slave + if (OfpRole.BECOMESLAVE.equals(newRole)) { + deviceContext.getDeviceFlowRegistry().close(); + } + + MdSalRegistrationUtils.unregisterServices(rpcContext); + } - final ListenableFuture onClusterRoleChange = deviceContext.onClusterRoleChange(newRole); - Futures.addCallback(onClusterRoleChange, new FutureCallback() { - @Override - public void onSuccess(@Nullable final Void aVoid) { - LOG.info("{}ing services for node {} was successful", logText, deviceInfo); - if (newRole.equals(OfpRole.BECOMESLAVE)) notifyServiceChangeListeners(deviceInfo, true); + } + + private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) { + // Fill device flow registry with flows from datastore + final ListenableFuture>> deviceFlowRegistryFill = deviceFlowRegistry.fill(); + + // Start statistics scheduling only after we finished initializing device flow registry + Futures.addCallback(deviceFlowRegistryFill, new FutureCallback>>() { + @Override + public void onSuccess(@Nullable List> result) { + if (LOG.isDebugEnabled()) { + // Count all flows we read from datastore for debugging purposes. + // This number do not always represent how many flows were actually added + // to DeviceFlowRegistry, because of possible duplicates. + long flowCount = Optional.fromNullable(result).asSet().stream() + .flatMap(Collection::stream) + .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream()) + .flatMap(flowCapableNode -> flowCapableNode.getTable().stream()) + .flatMap(table -> table.getFlow().stream()) + .count(); + + LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId().getValue()); } - @Override - public void onFailure(final Throwable throwable) { - LOG.warn("ing services for node {} was NOT successful, closing connection", logText, deviceInfo); - closeConnection(deviceInfo); + statisticsManager.startScheduling(deviceInfo); + } + + @Override + public void onFailure(Throwable t) { + // If we manually cancelled this future, do not start scheduling of statistics + if (deviceFlowRegistryFill.isCancelled()) { + LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId().getValue()); + } else { + LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId().getValue(), t); + statisticsManager.startScheduling(deviceInfo); } - }); - } + } + }); } public MessageIntelligenceAgency getMessageIntelligenceAgency() { @@ -176,19 +230,20 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList @Override public DeviceContext getDeviceContext(DeviceInfo deviceInfo){ - return deviceManager.getDeviceContextFromNodeId(deviceInfo); + return deviceManager.gainContext(deviceInfo); } @Override - public Short gainVersionSafely(final DeviceInfo deviceInfo) { - return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getFeatures().getVersion() : null; + public StatisticsContext getStatisticsContext(DeviceInfo deviceInfo){ + return statisticsManager.gainContext(deviceInfo); } public Timeout newTimeout(@Nonnull TimerTask task, long delay, @Nonnull TimeUnit unit) { return hashedWheelTimer.newTimeout(task, delay, unit); } - ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){ + @Override + public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){ return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getConnectionState() : null; } @@ -200,26 +255,35 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList @Override public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) { if (!success) { - LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo); + LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue()); closeConnection(deviceInfo); } else { - LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo); + LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo.getNodeId().getValue()); } } @Override public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) { if (!success) { - LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo); + LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue()); closeConnection(deviceInfo); } else { - LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo); + LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo.getNodeId().getValue()); } } @VisibleForTesting - public boolean isServiceChangeListenersEmpty() { + boolean isServiceChangeListenersEmpty() { return this.serviceChangeListeners.isEmpty(); } + @Override + public NotificationPublishService getNotificationPublishService() { + return notificationPublishService; + } + + @Override + public void setNotificationPublishService(NotificationPublishService notificationPublishService) { + this.notificationPublishService = notificationPublishService; + } }