X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=a19efba66b48b6f5d093522f97b93d7f8ef2c7d1;hb=c6f3d36b08270ee45900a789757c81474e72b4aa;hp=25d6df71b3f42f78cf3da8dec8a8e6e75a91ac41;hpb=2a4a0e5f27694abf010d2405e6717c801f916d1c;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 25d6df71b3..a19efba66b 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -7,41 +7,55 @@ */ package org.opendaylight.openflowplugin.impl.device; -import javax.annotation.CheckForNull; -import javax.annotation.Nonnull; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.TimerTask; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; - -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import io.netty.util.HashedWheelTimer; +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; -import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,35 +63,38 @@ import org.slf4j.LoggerFactory; /** * */ -public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper, AutoCloseable { +public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class); - private static final long TICK_DURATION = 10; // 0.5 sec. private final long globalNotificationQuota; - private ScheduledThreadPoolExecutor spyPool; + private final boolean switchFeaturesMandatory; + private final int spyRate = 10; private final DataBroker dataBroker; - private final HashedWheelTimer hashedWheelTimer; private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; - private NotificationService notificationService; - private NotificationPublishService notificationPublishService; + private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; - private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); - private final MessageIntelligenceAgency messageIntelligenceAgency; + private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); - private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500); - private final int maxQueueDepth = 25600; + private final long barrierIntervalNanos; + private final int barrierCountLimit; private ExtensionConverterProvider extensionConverterProvider; + private ScheduledThreadPoolExecutor spyPool; + private List deviceSynchronizedListeners; + private List deviceValidListeners; + + private final LifecycleConductor conductor; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, - @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency, - final long globalNotificationQuota) { + final long globalNotificationQuota, final boolean switchFeaturesMandatory, + final long barrierInterval, final int barrierCountLimit, + final LifecycleConductor lifecycleConductor) { + this.switchFeaturesMandatory = switchFeaturesMandatory; this.globalNotificationQuota = globalNotificationQuota; this.dataBroker = Preconditions.checkNotNull(dataBroker); - hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500); /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); @@ -91,80 +108,91 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi throw new IllegalStateException(e); } - this.messageIntelligenceAgency = messageIntelligenceAgency; + this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval); + this.barrierCountLimit = barrierCountLimit; + + this.conductor = lifecycleConductor; + spyPool = new ScheduledThreadPoolExecutor(1); + this.deviceSynchronizedListeners = new ArrayList<>(); + this.deviceValidListeners = new ArrayList<>(); } @Override public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { - deviceInitPhaseHandler = handler; + this.deviceInitPhaseHandler = handler; } @Override - public void onDeviceContextLevelUp(final DeviceContext deviceContext) { + public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception { // final phase - we have to add new Device to MD-SAL DataStore - Preconditions.checkNotNull(deviceContext); - try { - ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); - deviceContext.onPublished(); - - } catch (final Exception e) { - LOG.warn("Node {} can not be add to OPERATIONAL DataStore yet because {} ", deviceContext.getDeviceState().getNodeId(), e.getMessage()); - LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e); - try { - deviceContext.close(); - } catch (Exception e1) { - LOG.warn("Exception on device context close. ", e); - } - } - + LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); + DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); + ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); + deviceContext.onPublished(); } @Override - public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { + public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { Preconditions.checkArgument(connectionContext != null); - Preconditions.checkState(!deviceContexts.containsKey(connectionContext.getNodeId()), - "Rejecting connection from node which is already connected and there exist deviceContext for it: {}", - connectionContext.getNodeId() - ); - LOG.info("Initializing New Connection DeviceContext for node:{}", connectionContext.getNodeId()); + DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + /** + * This part prevent destroy another device context. Throwing here an exception result to propagate close connection + * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} + * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close + */ + if (deviceContexts.containsKey(deviceInfo)) { + LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId()); + return false; + } + + LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", + connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId()); + + // Add Disconnect handler + connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this); // Cache this for clarity final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter(); //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published connectionAdapter.setPacketInFiltering(true); - final Short version = connectionContext.getFeatures().getVersion(); - final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); + final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); connectionContext.setOutboundQueueProvider(outboundQueueProvider); final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = - connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos); + connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - final DeviceState deviceState = createDeviceState(connectionContext); - final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, - hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary); + final DeviceState deviceState = new DeviceStateImpl(deviceInfo); + this.registerDeviceSynchronizeListeners(deviceState); + this.registerDeviceValidListeners(deviceState); - deviceContext.addDeviceContextClosedHandler(this); - Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null); + final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, + deviceState, + dataBroker, + conductor, + outboundQueueProvider, + translatorLibrary); + + Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed."); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); - deviceContext.setNotificationService(notificationService); - deviceContext.setNotificationPublishService(notificationPublishService); + deviceContext.setNotificationPublishService(conductor.getNotificationPublishService()); updatePacketInRateLimiters(); final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( connectionAdapter, deviceContext); connectionAdapter.setMessageListener(messageListener); + notifyDeviceValidListeners(deviceInfo, true); - deviceCtxLevelUp(deviceContext); - } + deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo()); + + notifyDeviceSynchronizeListeners(deviceInfo, true); - private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) { - return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); + return true; } private void updatePacketInRateLimiters() { @@ -183,12 +211,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } } - void deviceCtxLevelUp(final DeviceContext deviceContext) { - deviceContext.getDeviceState().setValid(true); - LOG.trace("Device context level up called."); - deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext); - } - @Override public TranslatorLibrary oook() { return translatorLibrary; @@ -200,37 +222,35 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void setNotificationService(final NotificationService notificationServiceParam) { - notificationService = notificationServiceParam; - } - - @Override - public void setNotificationPublishService(final NotificationPublishService notificationService) { - notificationPublishService = notificationService; - } + public void close() { + for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); + iterator.hasNext();) { + final DeviceContext deviceCtx = iterator.next(); + notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false); + deviceCtx.shutdownConnection(); + deviceCtx.shuttingDownDataStoreTransactions(); + } - @Override - public void close() throws Exception { - for (final DeviceContext deviceContext : deviceContexts.values()) { - deviceContext.close(); + if (spyPool != null) { + spyPool.shutdownNow(); + spyPool = null; } } @Override - public void onDeviceContextClosed(final DeviceContext deviceContext) { - LOG.trace("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId()); - deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId()); + public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { + LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId()); + deviceContexts.remove(deviceInfo); updatePacketInRateLimiters(); } @Override public void initialize() { - spyPool = new ScheduledThreadPoolExecutor(1); - spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS); + spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS); } @Override - public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) { + public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) { this.extensionConverterProvider = extensionConverterProvider; } @@ -238,4 +258,149 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi public ExtensionConverterProvider getExtensionConverterProvider() { return extensionConverterProvider; } + + @Override + public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) { + this.deviceTerminPhaseHandler = handler; + } + + @Override + public void onDeviceDisconnected(final ConnectionContext connectionContext) { + LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId()); + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); + + if (null == deviceCtx) { + LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId()); + return; + } + + if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { + /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */ + deviceCtx.removeAuxiliaryConnectionContext(connectionContext); + } else { + notifyDeviceValidListeners(deviceInfo, false); + /* Device is disconnected and so we need to close TxManager */ + final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); + Futures.addCallback(future, new FutureCallback() { + + @Override + public void onSuccess(final Void result) { + LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId()); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); + } + + @Override + public void onFailure(final Throwable t) { + LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); + } + }); + /* Add timer for Close TxManager because it could fain ind cluster without notification */ + final TimerTask timerTask = timeout -> { + if (!future.isDone()) { + LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId()); + future.cancel(false); + } + }; + conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS); + } + } + + @VisibleForTesting + void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){ + deviceContexts.put(deviceInfo, deviceContext); + } + + @Override + public T gainContext(final DeviceInfo deviceInfo) { + return (T) deviceContexts.get(deviceInfo); + } + + @Override + public ListenableFuture onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) { + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId()); + if (OfpRole.BECOMEMASTER.equals(role)) { + return onDeviceTakeClusterLeadership(deviceInfo); + } + return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager(); + } + + @Override + public void registerDeviceSynchronizeListeners(final DeviceSynchronizeListener deviceSynchronizeListener) { + this.deviceSynchronizedListeners.add(deviceSynchronizeListener); + } + + @Override + public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) { + for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) { + listener.deviceIsSynchronized(deviceInfo, deviceSynchronized); + } + } + + @Override + public void registerDeviceValidListeners(final DeviceValidListener deviceValidListener) { + this.deviceValidListeners.add(deviceValidListener); + } + + @Override + public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) { + for (DeviceValidListener listener : deviceValidListeners) { + listener.deviceIsValid(deviceInfo, deviceValid); + } + } + + private ListenableFuture onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) { + LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId()); + /* validation */ + StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo); + if (statisticsContext == null) { + final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId()); + LOG.warn(errMsg); + return Futures.immediateFailedFuture(new IllegalStateException(errMsg)); + } + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + /* Prepare init info collecting */ + notifyDeviceSynchronizeListeners(deviceInfo, false); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager(); + /* Init Collecting NodeInfo */ + final ListenableFuture initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation( + deviceContext, switchFeaturesMandatory); + /* Init Collecting StatInfo */ + final ListenableFuture statPollFuture = Futures.transform(initCollectingDeviceInfo, + new AsyncFunction() { + + @Override + public ListenableFuture apply(@Nonnull final Void input) throws Exception { + statisticsContext.statListForCollectingInitialization(); + return statisticsContext.initialGatherDynamicData(); + } + }); + + return Futures.transform(statPollFuture, new Function() { + + @Override + public Void apply(final Boolean input) { + if (ConnectionContext.CONNECTION_STATE.RIP.equals(conductor.gainConnectionStateSafely(deviceInfo))) { + final String errMsg = String.format("We lost connection for Device %s, context has to be closed.", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + if (!input) { + final String errMsg = String.format("Get Initial Device %s information fails", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + LOG.debug("Get Initial Device {} information is successful", deviceInfo.getNodeId()); + notifyDeviceSynchronizeListeners(deviceInfo, true); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); + deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true); + return null; + } + }); + } + }