X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=362dbe171418d6dec88ea25dd2fee6e1c96c355f;hb=6e511c65a597995ee38bb1e78c6b24967ca755c1;hp=31df880261d5dd668047c2b57febe1142531ca44;hpb=329e9c0de4ceb8186602f728eddfb3be610265d0;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 31df880261..362dbe1714 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -8,16 +8,19 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.Timeout; import io.netty.util.TimerTask; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -26,28 +29,33 @@ import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +69,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private final long globalNotificationQuota; private final boolean switchFeaturesMandatory; + private boolean isNotificationFlowRemovedOff; private final int spyRate = 10; @@ -68,24 +77,25 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; - private NotificationPublishService notificationPublishService; - private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); + private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); private final long barrierIntervalNanos; private final int barrierCountLimit; private ExtensionConverterProvider extensionConverterProvider; private ScheduledThreadPoolExecutor spyPool; + private Set deviceSynchronizedListeners; + private Set deviceValidListeners; private final LifecycleConductor conductor; - private boolean isStatisticsRpcEnabled; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, final long globalNotificationQuota, final boolean switchFeaturesMandatory, final long barrierInterval, final int barrierCountLimit, - final LifecycleConductor lifecycleConductor) { + final LifecycleConductor lifecycleConductor, boolean isNotificationFlowRemovedOff) { this.switchFeaturesMandatory = switchFeaturesMandatory; this.globalNotificationQuota = globalNotificationQuota; + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; this.dataBroker = Preconditions.checkNotNull(dataBroker); /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); @@ -105,6 +115,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.conductor = lifecycleConductor; spyPool = new ScheduledThreadPoolExecutor(1); + this.deviceSynchronizedListeners = new HashSet<>(); + this.deviceValidListeners = new HashSet<>(); } @@ -114,10 +126,10 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void onDeviceContextLevelUp(final NodeId nodeId) throws Exception { + public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception { // final phase - we have to add new Device to MD-SAL DataStore - LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", nodeId); - DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(nodeId)); + LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); + DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); deviceContext.onPublished(); } @@ -126,19 +138,19 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { Preconditions.checkArgument(connectionContext != null); - NodeId nodeId = connectionContext.getNodeId(); + DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); /** * This part prevent destroy another device context. Throwing here an exception result to propagate close connection * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close */ - if (deviceContexts.containsKey(nodeId)) { + if (deviceContexts.containsKey(deviceInfo)) { LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId()); return false; } LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", - connectionContext.getConnectionAdapter().getRemoteAddress(), nodeId); + connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId()); // Add Disconnect handler connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this); @@ -148,43 +160,42 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published connectionAdapter.setPacketInFiltering(true); - final Short version = connectionContext.getFeatures().getVersion(); - final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); + final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); connectionContext.setOutboundQueueProvider(outboundQueueProvider); final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - final DeviceState deviceState = createDeviceState(connectionContext); + final DeviceState deviceState = new DeviceStateImpl(deviceInfo); + this.addDeviceSynchronizeListener(deviceState); + this.addDeviceValidListener(deviceState); + final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, conductor, outboundQueueProvider, translatorLibrary, - switchFeaturesMandatory); + this); - Verify.verify(deviceContexts.putIfAbsent(nodeId, deviceContext) == null, "DeviceCtx still not closed."); + Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed."); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); - deviceContext.setStatisticsRpcEnabled(isStatisticsRpcEnabled); - deviceContext.setNotificationPublishService(notificationPublishService); + deviceContext.setNotificationPublishService(conductor.getNotificationPublishService()); updatePacketInRateLimiters(); final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( connectionAdapter, deviceContext); connectionAdapter.setMessageListener(messageListener); - deviceState.setValid(true); + notifyDeviceValidListeners(deviceInfo, true); - deviceInitPhaseHandler.onDeviceContextLevelUp(nodeId); + deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo()); - return true; - } + notifyDeviceSynchronizeListeners(deviceInfo, true); - private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) { - return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); + return true; } private void updatePacketInRateLimiters() { @@ -213,16 +224,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.translatorLibrary = translatorLibrary; } - @Override - public void setNotificationPublishService(final NotificationPublishService notificationService) { - notificationPublishService = notificationService; - } - @Override public void close() { for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); iterator.hasNext();) { final DeviceContext deviceCtx = iterator.next(); + notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false); deviceCtx.shutdownConnection(); deviceCtx.shuttingDownDataStoreTransactions(); } @@ -234,9 +241,9 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void onDeviceContextLevelDown(final DeviceContext deviceContext) { - LOG.debug("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId()); - deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId(), deviceContext); + public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { + LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId()); + deviceContexts.remove(deviceInfo); updatePacketInRateLimiters(); } @@ -245,16 +252,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS); } - @Override - public DeviceContext getDeviceContextFromNodeId(final NodeId nodeId) { - return deviceContexts.get(nodeId); - } - - @Override - public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) { - this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; - } - @Override public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) { this.extensionConverterProvider = extensionConverterProvider; @@ -273,11 +270,11 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi @Override public void onDeviceDisconnected(final ConnectionContext connectionContext) { LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId()); - final NodeId nodeId = connectionContext.getNodeId(); - final DeviceContext deviceCtx = this.deviceContexts.get(nodeId); + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); if (null == deviceCtx) { - LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", nodeId); + LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId()); return; } @@ -285,31 +282,28 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */ deviceCtx.removeAuxiliaryConnectionContext(connectionContext); } else { + notifyDeviceValidListeners(deviceInfo, false); /* Device is disconnected and so we need to close TxManager */ final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(final Void result) { - LOG.debug("TxChainManager for device {} is closed successful.", nodeId); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx); + LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId()); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); } @Override public void onFailure(final Throwable t) { - LOG.warn("TxChainManager for device {} failed by closing.", nodeId, t); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx); + LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); } }); /* Add timer for Close TxManager because it could fain ind cluster without notification */ - final TimerTask timerTask = new TimerTask() { - - @Override - public void run(final Timeout timeout) throws Exception { - if (!future.isDone()) { - LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", nodeId); - future.cancel(false); - } + final TimerTask timerTask = timeout -> { + if (!future.isDone()) { + LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId()); + future.cancel(false); } }; conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS); @@ -317,7 +311,109 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @VisibleForTesting - void addDeviceContextToMap(final NodeId nodeId, final DeviceContext deviceContext){ - deviceContexts.put(nodeId, deviceContext); + void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){ + deviceContexts.put(deviceInfo, deviceContext); } + + @Override + public T gainContext(final DeviceInfo deviceInfo) { + return (T) deviceContexts.get(deviceInfo); + } + + @Override + public ListenableFuture onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) { + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId()); + if (OfpRole.BECOMEMASTER.equals(role)) { + return onDeviceTakeClusterLeadership(deviceInfo); + } + return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager(); + } + + @Override + public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) { + this.deviceSynchronizedListeners.add(deviceSynchronizeListener); + } + + @Override + public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) { + for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) { + listener.deviceIsSynchronized(deviceInfo, deviceSynchronized); + } + } + + @Override + public void addDeviceValidListener(final DeviceValidListener deviceValidListener) { + this.deviceValidListeners.add(deviceValidListener); + } + + @Override + public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) { + for (DeviceValidListener listener : deviceValidListeners) { + listener.deviceIsValid(deviceInfo, deviceValid); + } + } + + @Override + public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; + } + + @Override + public boolean getIsNotificationFlowRemovedOff() { + return this.isNotificationFlowRemovedOff; + } + + private ListenableFuture onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) { + LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId()); + /* validation */ + StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo); + if (statisticsContext == null) { + final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId()); + LOG.warn(errMsg); + return Futures.immediateFailedFuture(new IllegalStateException(errMsg)); + } + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + /* Prepare init info collecting */ + notifyDeviceSynchronizeListeners(deviceInfo, false); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager(); + /* Init Collecting NodeInfo */ + final ListenableFuture initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation( + deviceContext, switchFeaturesMandatory); + /* Init Collecting StatInfo */ + final ListenableFuture statPollFuture = Futures.transform(initCollectingDeviceInfo, + new AsyncFunction() { + + @Override + public ListenableFuture apply(@Nonnull final Void input) throws Exception { + statisticsContext.statListForCollectingInitialization(); + return statisticsContext.initialGatherDynamicData(); + } + }); + + return Futures.transform(statPollFuture, new Function() { + + @Override + public Void apply(final Boolean input) { + if (ConnectionContext.CONNECTION_STATE.RIP.equals(conductor.gainConnectionStateSafely(deviceInfo))) { + final String errMsg = String.format("We lost connection for Device %s, context has to be closed.", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + if (!input) { + final String errMsg = String.format("Get Initial Device %s information fails", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + LOG.debug("Get Initial Device {} information is successful", deviceInfo.getNodeId()); + notifyDeviceSynchronizeListeners(deviceInfo, true); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); + deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true); + return null; + } + }); + } + }