X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=f87bbaa8106762139d9e20a6ff4e6c72d32bce2d;hb=d1af0fd5a4053a10917f631bae42970c1960fd20;hp=9f8bd244a2f3dc9f164fd49b36378109544b6b13;hpb=84e650a6e5beed21e2855355f7bc79c9b8bdd526;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 9f8bd244a2..f87bbaa810 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -8,21 +8,18 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; -import java.math.BigInteger; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nonnull; +import io.netty.util.TimerTask; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; @@ -31,6 +28,7 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosed import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey; +import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; @@ -66,6 +64,7 @@ import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory; import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl; +import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl; @@ -77,6 +76,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; @@ -96,16 +97,29 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ -public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper{ +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class); @@ -115,10 +129,16 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private static final float LOW_WATERMARK_FACTOR = 0.75f; // TODO: high water mark factor should be parametrized private static final float HIGH_WATERMARK_FACTOR = 0.95f; + + // Timeout in seconds after what we will give up on propagating role + private static final int SET_ROLE_TIMEOUT = 10; + private boolean initialized; private static final Long RETRY_DELAY = 100L; private static final int RETRY_COUNT = 3; + private SalRoleService salRoleService = null; + private final HashedWheelTimer hashedWheelTimer; private ConnectionContext primaryConnectionContext; private final DeviceState deviceState; private final DataBroker dataBroker; @@ -145,6 +165,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private final ConvertorExecutor convertorExecutor; private volatile CONTEXT_STATE state; private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; + private final DeviceManager myManager; + private Boolean isAddNotificationSent = false; DeviceContextImpl( @Nonnull final ConnectionContext primaryConnectionContext, @@ -153,9 +175,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Nonnull final TranslatorLibrary translatorLibrary, @Nonnull final DeviceManager manager, final ConvertorExecutor convertorExecutor, - final boolean skipTableFeatures) { + final boolean skipTableFeatures, + final HashedWheelTimer hashedWheelTimer, + final DeviceManager myManager) { this.primaryConnectionContext = primaryConnectionContext; this.deviceInfo = primaryConnectionContext.getDeviceInfo(); + this.hashedWheelTimer = hashedWheelTimer; + this.myManager = myManager; this.deviceState = new DeviceStateImpl(); this.dataBroker = dataBroker; this.auxiliaryConnectionContexts = new HashMap<>(); @@ -184,7 +210,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void initialSubmitTransaction() { - transactionChainManager.initialSubmitWriteTransaction(); + if (initialized) { + transactionChainManager.initialSubmitWriteTransaction(); + } } @Override @@ -219,7 +247,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi public void writeToTransaction(final LogicalDatastoreType store, final InstanceIdentifier path, final T data){ - if (Objects.nonNull(transactionChainManager)) { + if (initialized) { transactionChainManager.writeToTransaction(store, path, data, false); } } @@ -228,21 +256,21 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi public void writeToTransactionWithParentsSlow(final LogicalDatastoreType store, final InstanceIdentifier path, final T data){ - if (Objects.nonNull(transactionChainManager)) { + if (initialized) { transactionChainManager.writeToTransaction(store, path, data, true); } } @Override public void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier path) throws TransactionChainClosedException { - if (Objects.nonNull(transactionChainManager)) { + if (initialized) { transactionChainManager.addDeleteOperationTotTxChain(store, path); } } @Override public boolean submitTransaction() { - return Objects.nonNull(transactionChainManager) && transactionChainManager.submitWriteTransaction(); + return initialized && transactionChainManager.submitWriteTransaction(); } @Override @@ -292,17 +320,17 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification = flowRemovedTranslator.translate(flowRemoved, deviceInfo, null); - if(!deviceManager.getIsNotificationFlowRemovedOff()) { + if(deviceManager.isFlowRemovedNotificationOn()) { // Trigger off a notification notificationPublishService.offerNotification(flowRemovedNotification); } else if(LOG.isDebugEnabled()) { - LOG.debug("For nodeId={} isNotificationFlowRemovedOff={}", getDeviceInfo().getLOGValue(), deviceManager.getIsNotificationFlowRemovedOff()); + LOG.debug("For nodeId={} isFlowRemovedNotificationOn={}", getDeviceInfo().getLOGValue(), deviceManager.isFlowRemovedNotificationOn()); } final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener(); if (itemLifecycleListener != null) { //2. create registry key - final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification); + final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification); //3. lookup flowId final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey); //4. if flowId present: @@ -321,6 +349,26 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } } + @Override + public void sendNodeAddedNotification() { + if (!isAddNotificationSent) { + isAddNotificationSent = true; + NodeUpdatedBuilder builder = new NodeUpdatedBuilder(); + builder.setId(getDeviceInfo().getNodeId()); + builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())); + LOG.debug("Publishing node added notification for {}", builder.build()); + notificationPublishService.offerNotification(builder.build()); + } + } + + @Override + public void sendNodeRemovedNotification() { + NodeRemovedBuilder builder = new NodeRemovedBuilder(); + builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())); + LOG.debug("Publishing node removed notification for {}", builder.build()); + notificationPublishService.offerNotification(builder.build()); + } + @Override public void processPortStatusMessage(final PortStatusMessage portStatus) { messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); @@ -479,15 +527,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return translatorLibrary; } - @Override - public synchronized void close() { - LOG.debug("closing deviceContext: {}, nodeId:{}", - getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(), - getDeviceInfo().getLOGValue()); - // NOOP - throw new UnsupportedOperationException("Autocloseble.close will be removed soon"); - } - @Override public void setCurrentBarrierTimeout(final Timeout timeout) { barrierTaskTimeout = timeout; @@ -511,7 +550,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void onPublished() { Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState())); - setState(CONTEXT_STATE.WORKING); + this.state = CONTEXT_STATE.WORKING; primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) { switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false); @@ -557,26 +596,29 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getLOGValue()); return; } - /* Terminate Auxiliary Connection */ + + // Terminate Auxiliary Connection for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) { LOG.debug("Closing auxiliary connection {}", connectionContext.getNodeId()); connectionContext.closeConnection(false); } - /* Terminate Primary Connection */ + + // Terminate Primary Connection getPrimaryConnectionContext().closeConnection(true); - /* Close all Group Registry */ - deviceGroupRegistry.close(); - deviceFlowRegistry.close(); - deviceMeterRegistry.close(); + + // Close all datastore registries + if (initialized) { + deviceGroupRegistry.close(); + deviceFlowRegistry.close(); + deviceMeterRegistry.close(); + } } @Override public ListenableFuture shuttingDownDataStoreTransactions() { - ListenableFuture future = Futures.immediateFuture(null); - if (Objects.nonNull(this.transactionChainManager)) { - future = this.transactionChainManager.shuttingDown(); - } - return future; + return initialized + ? this.transactionChainManager.shuttingDown() + : Futures.immediateFuture(null); } @VisibleForTesting @@ -595,17 +637,25 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } @Override - public void setState(CONTEXT_STATE state) { - this.state = state; + public ListenableFuture stopClusterServices() { + final ListenableFuture deactivateTxManagerFuture = initialized + ? transactionChainManager.deactivateTransactionManager() + : Futures.immediateFuture(null); + + final boolean connectionInterrupted = + this.getPrimaryConnectionContext() + .getConnectionState() + .equals(ConnectionContext.CONNECTION_STATE.RIP); + if (!connectionInterrupted) { + LOG.info("This controller instance is now acting as a non-owner for node {}", deviceInfo.getLOGValue()); + } + + return deactivateTxManagerFuture; } @Override - public ListenableFuture stopClusterServices(boolean deviceDisconnected) { - ListenableFuture future = Futures.immediateFuture(null); - if (Objects.nonNull(this.transactionChainManager)) { - future = this.transactionChainManager.deactivateTransactionManager(); - } - return future; + public void cleanupDeviceData() { + myManager.removeDeviceFromOperationalDS(deviceInfo); } @Override @@ -618,9 +668,21 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return this.deviceInfo; } + @Override + public void close() { + if (CONTEXT_STATE.TERMINATION.equals(getState())){ + if (LOG.isDebugEnabled()) { + LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); + } + } else { + this.state = CONTEXT_STATE.TERMINATION; + } + sendNodeRemovedNotification(); + } + @Override public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){ - if (Objects.nonNull(this.transactionChainManager)) { + if (initialized) { this.transactionChainManager.setLifecycleService(lifecycleService); } } @@ -628,7 +690,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void replaceConnectionContext(final ConnectionContext connectionContext){ // Act like we are initializing the context - setState(CONTEXT_STATE.INITIALIZATION); + this.state = CONTEXT_STATE.INITIALIZATION; this.primaryConnectionContext = connectionContext; this.onPublished(); } @@ -638,6 +700,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return this.skipTableFeatures; } + @Override + public void setSalRoleService(@Nonnull SalRoleService salRoleService) { + this.salRoleService = salRoleService; + } + @Override public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) { this.clusterInitializationPhaseHandler = handler; @@ -664,6 +731,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return false; } + Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback()); + return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext()); } @@ -674,10 +743,71 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue()); } this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo); - this.deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier()); + this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier()); this.deviceGroupRegistry = new DeviceGroupRegistryImpl(); this.deviceMeterRegistry = new DeviceMeterRegistryImpl(); this.initialized = true; } } + + @Nullable + @Override + public RequestContext createRequestContext() { + return new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { + @Override + public void close() { + } + }; + + } + + ListenableFuture> sendRoleChangeToDevice(final OfpRole newRole) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId()); + } + + final Future> setRoleOutputFuture; + + if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) { + final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole) + .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build(); + + setRoleOutputFuture = this.salRoleService.setRole(setRoleInput); + + final TimerTask timerTask = timeout -> { + if (!setRoleOutputFuture.isDone()) { + LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT); + setRoleOutputFuture.cancel(true); + } + }; + + hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS); + } else { + LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion()); + return Futures.immediateFuture(null); + } + + return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture); + } + + @Override + public ListenableFuture> makeDeviceSlave() { + return sendRoleChangeToDevice(OfpRole.BECOMESLAVE); + } + + private class RpcResultFutureCallback implements FutureCallback> { + @Override + public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { + if (LOG.isDebugEnabled()) { + LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue()); + } + sendNodeAddedNotification(); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue()); + shutdownConnection(); + } + } }