X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2FLifecycleConductorImpl.java;h=24e1d7e2ef07af03214fea9890b1004c976484e8;hb=dd2c78841822ecb8e4728a7848086f3c2da633d3;hp=ff4d1b32be4f4d63137325387833e6749d4f215a;hpb=329e9c0de4ceb8186602f728eddfb3be610265d0;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java index ff4d1b32be..24e1d7e2ef 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -16,132 +17,232 @@ import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.openflowplugin.api.openflow.OFPManager; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChangeListener; import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener; +import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; +import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; +import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; +import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; +import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - /** */ -public final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener { +final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(LifecycleConductorImpl.class); private static final int TICKS_PER_WHEEL = 500; private static final long TICK_DURATION = 10; // 0.5 sec. private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); + private ExtensionConverterProvider extensionConverterProvider; private DeviceManager deviceManager; + private StatisticsManager statisticsManager; + private RpcManager rpcManager; private final MessageIntelligenceAgency messageIntelligenceAgency; - private ConcurrentHashMap serviceChangeListeners = new ConcurrentHashMap<>(); + private final ConvertorExecutor convertorExecutor; + private ConcurrentHashMap serviceChangeListeners = new ConcurrentHashMap<>(); + private NotificationPublishService notificationPublishService; + + LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency, ConvertorExecutor convertorExecutor) { + this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency); + this.convertorExecutor = convertorExecutor; + } + + @Override + public ExtensionConverterProvider getExtensionConverterProvider() { + return extensionConverterProvider; + } - public LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency) { - Preconditions.checkNotNull(messageIntelligenceAgency); - this.messageIntelligenceAgency = messageIntelligenceAgency; + @Override + public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) { + this.extensionConverterProvider = extensionConverterProvider; } - public void setSafelyDeviceManager(final DeviceManager deviceManager) { - if (this.deviceManager == null) { - this.deviceManager = deviceManager; + @Override + public void setSafelyManager(final OFPManager manager){ + if (manager instanceof RpcManager) { + if (rpcManager != null) { + LOG.info("RPC manager {} is already defined in conductor. ", manager); + return; + } + this.rpcManager = (RpcManager) manager; + } else { + if (manager instanceof StatisticsManager) { + if (statisticsManager != null) { + LOG.info("Statistics manager {} is already defined in conductor. ", manager); + return; + } + this.statisticsManager = (StatisticsManager) manager; + } else { + if (manager instanceof DeviceManager) { + if (deviceManager != null) { + LOG.info("Device manager {} is already defined in conductor. ", manager); + return; + } + this.deviceManager = (DeviceManager) manager; + } + } } } - public void addOneTimeListenerWhenServicesChangesDone(final ServiceChangeListener manager, final NodeId nodeId){ - LOG.debug("Listener {} for service change for node {} registered.", manager, nodeId); - serviceChangeListeners.put(nodeId, manager); + @Override + public void addOneTimeListenerWhenServicesChangesDone(final ServiceChangeListener manager, final DeviceInfo deviceInfo){ + LOG.debug("Listener {} for service change for node {} registered.", manager, deviceInfo.getNodeId()); + serviceChangeListeners.put(deviceInfo, manager); } @VisibleForTesting - void notifyServiceChangeListeners(final NodeId nodeId, final boolean success){ + void notifyServiceChangeListeners(final DeviceInfo deviceInfo, final boolean success){ if (serviceChangeListeners.size() == 0) { return; } LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size()); - for (final Map.Entry nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) { - if (nodeIdServiceChangeListenerEntry.getKey().equals(nodeId)) { - LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), nodeId, success); - nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(nodeId, success); - serviceChangeListeners.remove(nodeId); + for (final Map.Entry nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) { + if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) { + LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo, success); + nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success); + serviceChangeListeners.remove(deviceInfo); } } } @Override - public void roleInitializationDone(final NodeId nodeId, final boolean success) { + public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) { if (!success) { - LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", nodeId); - closeConnection(nodeId); + LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo); + closeConnection(deviceInfo); } else { - LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", nodeId); + LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo); } } - public void closeConnection(final NodeId nodeId) { - LOG.debug("Close connection called for node {}", nodeId); - final DeviceContext deviceContext = getDeviceContext(nodeId); + public void closeConnection(final DeviceInfo deviceInfo) { + LOG.debug("Close connection called for node {}", deviceInfo); + final DeviceContext deviceContext = getDeviceContext(deviceInfo); if (null != deviceContext) { + deviceManager.notifyDeviceValidListeners(deviceInfo, false); deviceContext.shutdownConnection(); } } @Override - public void roleChangeOnDevice(final NodeId nodeId, final boolean success, final OfpRole newRole, final boolean initializationPhase) { + public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) { - final DeviceContext deviceContext = getDeviceContext(nodeId); + final DeviceContext deviceContext = Preconditions.checkNotNull( + deviceManager.gainContext(deviceInfo), + "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId() + ); - if (null == deviceContext) { - LOG.warn("Something went wrong, device context for nodeId: {} doesn't exists"); - return; - } - if (!success) { - LOG.warn("Role change to {} in role context for node {} was NOT successful, closing connection", newRole, nodeId); - closeConnection(nodeId); + final RpcContext rpcContext = Preconditions.checkNotNull( + rpcManager.gainContext(deviceInfo), + "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId() + ); + + LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo); + + final String logText; + + if (OfpRole.BECOMEMASTER.equals(newRole)) { + logText = "Start"; + fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry()); + MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider, convertorExecutor); + + if (rpcContext.isStatisticsRpcEnabled()) { + MdSalRegistrationUtils.registerStatCompatibilityServices( + rpcContext, + deviceContext, + notificationPublishService, convertorExecutor); + } } else { - if (initializationPhase) { - LOG.debug("Initialization phase skipping starting services."); - return; + logText = "Stopp"; + statisticsManager.stopScheduling(deviceInfo); + + // Clean device flow registry if we became slave + if (OfpRole.BECOMESLAVE.equals(newRole)) { + deviceContext.getDeviceFlowRegistry().close(); } - LOG.info("Role change to {} in role context for node {} was successful, staring/stopping services.", newRole, nodeId); - //TODO: This is old way to check if statistics is running, remove after statistics changes implemented - final DeviceState deviceState = deviceContext.getDeviceState(); - if (null != deviceState) { - if (OfpRole.BECOMEMASTER.equals(newRole) && (getDeviceContext(nodeId) != null)) { - deviceState.setRole(OfpRole.BECOMEMASTER); - } else { - deviceState.setRole(OfpRole.BECOMESLAVE); + MdSalRegistrationUtils.unregisterServices(rpcContext); + } + + final ListenableFuture onClusterRoleChange = deviceManager.onClusterRoleChange(deviceInfo, newRole); + Futures.addCallback(onClusterRoleChange, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void aVoid) { + LOG.info("{}ing services for node {} was successful", logText, deviceInfo); + if (newRole.equals(OfpRole.BECOMESLAVE)) { + notifyServiceChangeListeners(deviceInfo, true); } } - final ListenableFuture onClusterRoleChange = deviceContext.onClusterRoleChange(null, newRole); - Futures.addCallback(onClusterRoleChange, new FutureCallback() { - @Override - public void onSuccess(@Nullable final Void aVoid) { - LOG.info("Starting/Stopping services for node {} was successful", nodeId); - if (newRole.equals(OfpRole.BECOMESLAVE)) notifyServiceChangeListeners(nodeId, true); + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("{}ing services for node {} was NOT successful, closing connection", logText, deviceInfo); + closeConnection(deviceInfo); + } + }); + } + + private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) { + // Fill device flow registry with flows from datastore + final ListenableFuture>> deviceFlowRegistryFill = deviceFlowRegistry.fill(); + + // Start statistics scheduling only after we finished initializing device flow registry + Futures.addCallback(deviceFlowRegistryFill, new FutureCallback>>() { + @Override + public void onSuccess(@Nullable List> result) { + if (LOG.isDebugEnabled()) { + // Count all flows we read from datastore for debugging purposes. + // This number do not always represent how many flows were actually added + // to DeviceFlowRegistry, because of possible duplicates. + long flowCount = Optional.fromNullable(result).asSet().stream() + .flatMap(Collection::stream) + .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream()) + .flatMap(flowCapableNode -> flowCapableNode.getTable().stream()) + .flatMap(table -> table.getFlow().stream()) + .count(); + + LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId()); } - @Override - public void onFailure(final Throwable throwable) { - LOG.warn("Starting/Stopping services for node {} was NOT successful, closing connection", nodeId); - closeConnection(nodeId); + statisticsManager.startScheduling(deviceInfo); + } + + @Override + public void onFailure(Throwable t) { + // If we manually cancelled this future, do not start scheduling of statistics + if (deviceFlowRegistryFill.isCancelled()) { + LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId()); + } else { + LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t); + statisticsManager.startScheduling(deviceInfo); } - }); - } + } + }); } public MessageIntelligenceAgency getMessageIntelligenceAgency() { @@ -149,49 +250,61 @@ public final class LifecycleConductorImpl implements LifecycleConductor, RoleCha } @Override - public DeviceContext getDeviceContext(final NodeId nodeId){ - return deviceManager.getDeviceContextFromNodeId(nodeId); + public DeviceContext getDeviceContext(DeviceInfo deviceInfo){ + return deviceManager.gainContext(deviceInfo); } - public Short gainVersionSafely(final NodeId nodeId) { - return (null != getDeviceContext(nodeId)) ? getDeviceContext(nodeId).getPrimaryConnectionContext().getFeatures().getVersion() : null; + @Override + public StatisticsContext getStatisticsContext(DeviceInfo deviceInfo){ + return statisticsManager.gainContext(deviceInfo); } public Timeout newTimeout(@Nonnull TimerTask task, long delay, @Nonnull TimeUnit unit) { return hashedWheelTimer.newTimeout(task, delay, unit); } - public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final NodeId nodeId){ - return (null != getDeviceContext(nodeId)) ? getDeviceContext(nodeId).getPrimaryConnectionContext().getConnectionState() : null; + @Override + public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){ + return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getConnectionState() : null; } - public Long reserveXidForDeviceMessage(final NodeId nodeId){ - return null != getDeviceContext(nodeId) ? getDeviceContext(nodeId).reserveXidForDeviceMessage() : null; + @Override + public Long reserveXidForDeviceMessage(final DeviceInfo deviceInfo){ + return null != getDeviceContext(deviceInfo) ? getDeviceContext(deviceInfo).reserveXidForDeviceMessage() : null; } @Override - public void deviceStartInitializationDone(final NodeId nodeId, final boolean success) { + public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) { if (!success) { - LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", nodeId); - closeConnection(nodeId); + LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo); + closeConnection(deviceInfo); } else { - LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", nodeId); + LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo); } } @Override - public void deviceInitializationDone(final NodeId nodeId, final boolean success) { + public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) { if (!success) { - LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", nodeId); - closeConnection(nodeId); + LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo); + closeConnection(deviceInfo); } else { - LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", nodeId); + LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo); } } @VisibleForTesting - public boolean isServiceChangeListenersEmpty() { + boolean isServiceChangeListenersEmpty() { return this.serviceChangeListeners.isEmpty(); } + @Override + public NotificationPublishService getNotificationPublishService() { + return notificationPublishService; + } + + @Override + public void setNotificationPublishService(NotificationPublishService notificationPublishService) { + this.notificationPublishService = notificationPublishService; + } }