X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=362dbe171418d6dec88ea25dd2fee6e1c96c355f;hb=6e511c65a597995ee38bb1e78c6b24967ca755c1;hp=e84e0308b7f46679fc25ae0a47130ec841761bc7;hpb=fb8e0654f26522cf1a41764448c7fc18284fdecf;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index e84e0308b7..362dbe1714 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -8,15 +8,19 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.TimerTask; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -25,29 +29,33 @@ import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +69,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private final long globalNotificationQuota; private final boolean switchFeaturesMandatory; + private boolean isNotificationFlowRemovedOff; private final int spyRate = 10; @@ -68,24 +77,25 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; - private NotificationPublishService notificationPublishService; - private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); + private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); private final long barrierIntervalNanos; private final int barrierCountLimit; private ExtensionConverterProvider extensionConverterProvider; private ScheduledThreadPoolExecutor spyPool; + private Set deviceSynchronizedListeners; + private Set deviceValidListeners; private final LifecycleConductor conductor; - private boolean isStatisticsRpcEnabled; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, final long globalNotificationQuota, final boolean switchFeaturesMandatory, final long barrierInterval, final int barrierCountLimit, - final LifecycleConductor lifecycleConductor) { + final LifecycleConductor lifecycleConductor, boolean isNotificationFlowRemovedOff) { this.switchFeaturesMandatory = switchFeaturesMandatory; this.globalNotificationQuota = globalNotificationQuota; + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; this.dataBroker = Preconditions.checkNotNull(dataBroker); /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); @@ -105,6 +115,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.conductor = lifecycleConductor; spyPool = new ScheduledThreadPoolExecutor(1); + this.deviceSynchronizedListeners = new HashSet<>(); + this.deviceValidListeners = new HashSet<>(); } @@ -117,7 +129,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception { // final phase - we have to add new Device to MD-SAL DataStore LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); - DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo.getNodeId())); + DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); deviceContext.onPublished(); } @@ -132,7 +144,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close */ - if (deviceContexts.containsKey(deviceInfo.getNodeId())) { + if (deviceContexts.containsKey(deviceInfo)) { LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId()); return false; } @@ -148,43 +160,42 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published connectionAdapter.setPacketInFiltering(true); - final Short version = connectionContext.getFeatures().getVersion(); - final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); + final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); connectionContext.setOutboundQueueProvider(outboundQueueProvider); final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - final DeviceState deviceState = createDeviceState(connectionContext); + final DeviceState deviceState = new DeviceStateImpl(deviceInfo); + this.addDeviceSynchronizeListener(deviceState); + this.addDeviceValidListener(deviceState); + final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, conductor, outboundQueueProvider, translatorLibrary, - switchFeaturesMandatory); + this); - Verify.verify(deviceContexts.putIfAbsent(deviceInfo.getNodeId(), deviceContext) == null, "DeviceCtx still not closed."); + Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed."); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); - deviceContext.setStatisticsRpcEnabled(isStatisticsRpcEnabled); - deviceContext.setNotificationPublishService(notificationPublishService); + deviceContext.setNotificationPublishService(conductor.getNotificationPublishService()); updatePacketInRateLimiters(); final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( connectionAdapter, deviceContext); connectionAdapter.setMessageListener(messageListener); - deviceState.setValid(true); + notifyDeviceValidListeners(deviceInfo, true); deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo()); - return true; - } + notifyDeviceSynchronizeListeners(deviceInfo, true); - private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) { - return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); + return true; } private void updatePacketInRateLimiters() { @@ -213,16 +224,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.translatorLibrary = translatorLibrary; } - @Override - public void setNotificationPublishService(final NotificationPublishService notificationService) { - notificationPublishService = notificationService; - } - @Override public void close() { for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); iterator.hasNext();) { final DeviceContext deviceCtx = iterator.next(); + notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false); deviceCtx.shutdownConnection(); deviceCtx.shuttingDownDataStoreTransactions(); } @@ -236,7 +243,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi @Override public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId()); - deviceContexts.remove(deviceInfo.getNodeId()); + deviceContexts.remove(deviceInfo); updatePacketInRateLimiters(); } @@ -245,16 +252,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS); } - @Override - public DeviceContext getDeviceContextFromNodeId(final NodeId nodeId) { - return deviceContexts.get(nodeId); - } - - @Override - public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) { - this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; - } - @Override public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) { this.extensionConverterProvider = extensionConverterProvider; @@ -274,7 +271,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi public void onDeviceDisconnected(final ConnectionContext connectionContext) { LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId()); final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo.getNodeId()); + final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); if (null == deviceCtx) { LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId()); @@ -285,6 +282,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */ deviceCtx.removeAuxiliaryConnectionContext(connectionContext); } else { + notifyDeviceValidListeners(deviceInfo, false); /* Device is disconnected and so we need to close TxManager */ final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); Futures.addCallback(future, new FutureCallback() { @@ -313,7 +311,109 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @VisibleForTesting - void addDeviceContextToMap(final NodeId nodeId, final DeviceContext deviceContext){ - deviceContexts.put(nodeId, deviceContext); + void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){ + deviceContexts.put(deviceInfo, deviceContext); + } + + @Override + public T gainContext(final DeviceInfo deviceInfo) { + return (T) deviceContexts.get(deviceInfo); + } + + @Override + public ListenableFuture onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) { + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId()); + if (OfpRole.BECOMEMASTER.equals(role)) { + return onDeviceTakeClusterLeadership(deviceInfo); + } + return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager(); + } + + @Override + public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) { + this.deviceSynchronizedListeners.add(deviceSynchronizeListener); } + + @Override + public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) { + for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) { + listener.deviceIsSynchronized(deviceInfo, deviceSynchronized); + } + } + + @Override + public void addDeviceValidListener(final DeviceValidListener deviceValidListener) { + this.deviceValidListeners.add(deviceValidListener); + } + + @Override + public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) { + for (DeviceValidListener listener : deviceValidListeners) { + listener.deviceIsValid(deviceInfo, deviceValid); + } + } + + @Override + public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; + } + + @Override + public boolean getIsNotificationFlowRemovedOff() { + return this.isNotificationFlowRemovedOff; + } + + private ListenableFuture onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) { + LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId()); + /* validation */ + StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo); + if (statisticsContext == null) { + final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId()); + LOG.warn(errMsg); + return Futures.immediateFailedFuture(new IllegalStateException(errMsg)); + } + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + /* Prepare init info collecting */ + notifyDeviceSynchronizeListeners(deviceInfo, false); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager(); + /* Init Collecting NodeInfo */ + final ListenableFuture initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation( + deviceContext, switchFeaturesMandatory); + /* Init Collecting StatInfo */ + final ListenableFuture statPollFuture = Futures.transform(initCollectingDeviceInfo, + new AsyncFunction() { + + @Override + public ListenableFuture apply(@Nonnull final Void input) throws Exception { + statisticsContext.statListForCollectingInitialization(); + return statisticsContext.initialGatherDynamicData(); + } + }); + + return Futures.transform(statPollFuture, new Function() { + + @Override + public Void apply(final Boolean input) { + if (ConnectionContext.CONNECTION_STATE.RIP.equals(conductor.gainConnectionStateSafely(deviceInfo))) { + final String errMsg = String.format("We lost connection for Device %s, context has to be closed.", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + if (!input) { + final String errMsg = String.format("Get Initial Device %s information fails", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + LOG.debug("Get Initial Device {} information is successful", deviceInfo.getNodeId()); + notifyDeviceSynchronizeListeners(deviceInfo, true); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); + deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true); + return null; + } + }); + } + }