X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=df49b1443769a6145fa754673ab673114b22c2b1;hb=617a0726d931230945a8a8d28a6e34be39f56b16;hp=a3ed4479c6245f134c380c9ebae4cb60c5a30323;hpb=6e1e20dd97e1c7605cb5c8e619ace7dbb80f4781;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index a3ed4479c6..df49b14437 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -18,6 +19,7 @@ import io.netty.util.TimerTask; import java.util.Collections; import java.util.Iterator; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -25,10 +27,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; @@ -47,13 +51,17 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Messa import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; +import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl; +import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,11 +74,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private final long globalNotificationQuota; private final boolean switchFeaturesMandatory; - private boolean isNotificationFlowRemovedOff; - + private boolean isFlowRemovedNotificationOn; + private boolean skipTableFeatures; private static final int SPY_RATE = 10; private final DataBroker dataBroker; + private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorExecutor convertorExecutor; private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; @@ -79,36 +88,37 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); private final ConcurrentMap lifecycleServices = new ConcurrentHashMap<>(); - private final long barrierIntervalNanos; - private final int barrierCountLimit; + private long barrierIntervalNanos; + private int barrierCountLimit; + private ExtensionConverterProvider extensionConverterProvider; private ScheduledThreadPoolExecutor spyPool; private final ClusterSingletonServiceProvider singletonServiceProvider; private final NotificationPublishService notificationPublishService; private final MessageSpy messageSpy; private final HashedWheelTimer hashedWheelTimer; + private final boolean useSingleLayerSerialization; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, final long globalNotificationQuota, final boolean switchFeaturesMandatory, final long barrierInterval, final int barrierCountLimit, - final MessageSpy messageSpy, - final boolean isNotificationFlowRemovedOff, + final boolean isFlowRemovedNotificationOn, final ClusterSingletonServiceProvider singletonServiceProvider, final NotificationPublishService notificationPublishService, final HashedWheelTimer hashedWheelTimer, - final ConvertorExecutor convertorExecutor) { - this.switchFeaturesMandatory = switchFeaturesMandatory; - this.globalNotificationQuota = globalNotificationQuota; - this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; - this.dataBroker = Preconditions.checkNotNull(dataBroker); - this.convertorExecutor = convertorExecutor; - this.hashedWheelTimer = hashedWheelTimer; + final ConvertorExecutor convertorExecutor, + final boolean skipTableFeatures, + final boolean useSingleLayerSerialization, + final DeviceInitializerProvider deviceInitializerProvider) { + + this.dataBroker = dataBroker; + this.deviceInitializerProvider = deviceInitializerProvider; + /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); - final NodesBuilder nodesBuilder = new NodesBuilder(); nodesBuilder.setNode(Collections.emptyList()); tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build()); @@ -119,13 +129,19 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi throw new IllegalStateException(e); } + this.switchFeaturesMandatory = switchFeaturesMandatory; + this.globalNotificationQuota = globalNotificationQuota; + this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn; + this.skipTableFeatures = skipTableFeatures; + this.convertorExecutor = convertorExecutor; + this.hashedWheelTimer = hashedWheelTimer; this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval); this.barrierCountLimit = barrierCountLimit; - - spyPool = new ScheduledThreadPoolExecutor(1); + this.spyPool = new ScheduledThreadPoolExecutor(1); this.singletonServiceProvider = singletonServiceProvider; this.notificationPublishService = notificationPublishService; this.messageSpy = messageSpy; + this.useSingleLayerSerialization = useSingleLayerSerialization; } @@ -140,14 +156,15 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); deviceContext.onPublished(); + lifecycleService.registerDeviceRemovedHandler(this); lifecycleService.registerService(this.singletonServiceProvider); } @Override public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { Preconditions.checkArgument(connectionContext != null); + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); /* * This part prevent destroy another device context. Throwing here an exception result to propagate close connection * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} @@ -155,14 +172,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi */ if (deviceContexts.containsKey(deviceInfo)) { DeviceContext deviceContext = deviceContexts.get(deviceInfo); + LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue()); if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) { - LOG.info("Node {} already connected but context state not in TERMINATION state, replacing connection context", + LOG.warn("Node {} context state not in TERMINATION state.", connectionContext.getDeviceInfo().getLOGValue()); - deviceContext.replaceConnectionContext(connectionContext); return ConnectionStatus.ALREADY_CONNECTED; } else { - LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", - connectionContext.getDeviceInfo().getLOGValue()); return ConnectionStatus.CLOSING; } } @@ -171,11 +186,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId()); // Add Disconnect handler - connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this); + connectionContext.setDeviceDisconnectedHandler(this); + // Cache this for clarity final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter(); - //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published + // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published connectionAdapter.setPacketInFiltering(true); final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); @@ -185,21 +201,29 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); + final LifecycleService lifecycleService = new LifecycleServiceImpl(); final DeviceContext deviceContext = new DeviceContextImpl( connectionContext, dataBroker, messageSpy, translatorLibrary, this, - convertorExecutor); + convertorExecutor, + skipTableFeatures, + hashedWheelTimer, + this, + useSingleLayerSerialization, + deviceInitializerProvider); - deviceContexts.putIfAbsent(deviceInfo, deviceContext); + deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext)); + deviceContexts.put(deviceInfo, deviceContext); - final LifecycleService lifecycleService = new LifecycleServiceImpl(); lifecycleService.setDeviceContext(deviceContext); deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService); - lifecycleServices.putIfAbsent(deviceInfo, lifecycleService); + lifecycleServices.put(deviceInfo, lifecycleService); + + addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService); deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory); @@ -216,22 +240,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi return ConnectionStatus.MAY_CONTINUE; } - private void updatePacketInRateLimiters() { - synchronized (deviceContexts) { - final int deviceContextsSize = deviceContexts.size(); - if (deviceContextsSize > 0) { - long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; - if (freshNotificationLimit < 100) { - freshNotificationLimit = 100; - } - LOG.debug("fresh notification limit = {}", freshNotificationLimit); - for (final DeviceContext deviceContext : deviceContexts.values()) { - deviceContext.updatePacketInRateLimit(freshNotificationLimit); - } - } - } - } - @Override public TranslatorLibrary oook() { return translatorLibrary; @@ -251,35 +259,15 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi deviceCtx.shuttingDownDataStoreTransactions(); } - if (spyPool != null) { - spyPool.shutdownNow(); - spyPool = null; - } + Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow); + spyPool = null; + } @Override public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { - - LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue()); - } - updatePacketInRateLimiters(); - if (Objects.nonNull(lifecycleService)) { - try { - lifecycleService.close(); - LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue()); - } catch (Exception e) { - LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e); - } - } - - deviceContexts.remove(deviceInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); - } - + Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close); } @Override @@ -308,51 +296,51 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); - if (null == deviceCtx) { + if (Objects.isNull(deviceCtx)) { LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue()); return; } if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) { - LOG.debug("Device context for node {} is already is termination state, waiting for close all context"); + LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue()); return; } - deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION); + deviceCtx.close(); if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue()); - /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */ + // Connection is not PrimaryConnection so try to remove from Auxiliary Connections deviceCtx.removeAuxiliaryConnectionContext(connectionContext); } - //TODO: Auxiliary connections supported ? - { - /* Device is disconnected and so we need to close TxManager */ - final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); - Futures.addCallback(future, new FutureCallback() { - - @Override - public void onSuccess(final Void result) { - LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue()); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - @Override - public void onFailure(final Throwable t) { - LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue()); - LOG.trace("TxChainManager failed by closing. ", t); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - }); - /* Add timer for Close TxManager because it could fain ind cluster without notification */ - final TimerTask timerTask = timeout -> { - if (!future.isDone()) { - LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue()); - future.cancel(false); - } - }; - hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS); - } + // TODO: Auxiliary connections supported ? + // Device is disconnected and so we need to close TxManager + final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue()); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); + } + + @Override + public void onFailure(final Throwable t) { + LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue()); + LOG.trace("TxChainManager failed by closing. ", t); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); + } + }); + + // Add timer for Close TxManager because it could fail in cluster without notification + final TimerTask timerTask = timeout -> { + if (!future.isDone()) { + LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue()); + future.cancel(false); + } + }; + + hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS); } @VisibleForTesting @@ -361,18 +349,95 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public T gainContext(final DeviceInfo deviceInfo) { - return (T) deviceContexts.get(deviceInfo); + public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) { + this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff; + } + + @Override + public boolean isFlowRemovedNotificationOn() { + return this.isFlowRemovedNotificationOn; } + @Override - public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { - this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; + public void setSkipTableFeatures(boolean skipTableFeaturesValue) { + skipTableFeatures = skipTableFeaturesValue; } @Override - public boolean getIsNotificationFlowRemovedOff() { - return this.isNotificationFlowRemovedOff; + public void setBarrierCountLimit(final int barrierCountLimit) { + this.barrierCountLimit = barrierCountLimit; + } + + @Override + public void setBarrierInterval(final long barrierTimeoutLimit) { + this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit); + } + + @Override + public CheckedFuture removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) { + final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); + delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier()); + final CheckedFuture delFuture = delWtx.submit(); + + Futures.addCallback(delFuture, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + if (LOG.isDebugEnabled()) { + LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue()); + } + } + + @Override + public void onFailure(@Nonnull final Throwable t) { + LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t); + } + }); + + return delFuture; + } + + + private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) { + Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback>() { + @Override + public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { + if (LOG.isDebugEnabled()) { + LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue()); + } + } + + @Override + public void onFailure(Throwable throwable) { + LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue()); + lifecycleService.closeConnection(); + } + }); } + private void updatePacketInRateLimiters() { + synchronized (deviceContexts) { + final int deviceContextsSize = deviceContexts.size(); + if (deviceContextsSize > 0) { + long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; + if (freshNotificationLimit < 100) { + freshNotificationLimit = 100; + } + if (LOG.isDebugEnabled()) { + LOG.debug("fresh notification limit = {}", freshNotificationLimit); + } + for (final DeviceContext deviceContext : deviceContexts.values()) { + deviceContext.updatePacketInRateLimit(freshNotificationLimit); + } + } + } + } + + public void onDeviceRemoved(DeviceInfo deviceInfo) { + deviceContexts.remove(deviceInfo); + LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); + + lifecycleServices.remove(deviceInfo); + LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue()); + } }