From: Tomas Slusny Date: Wed, 2 Aug 2017 11:15:42 +0000 (+0200) Subject: Add RoleManager and RoleContext X-Git-Tag: release/oxygen~126 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=0a19a71a8d3153816b86cc46690b3e88a89e2a1f;hp=2e7de34840ef72e7b57209fc9a57d334b0a6402a;p=openflowplugin.git Add RoleManager and RoleContext - Move sending of MASTER and SLAVE role from DeviceContext and ContextChainHolder to new RoleContext and RoleManager - Use hashed wheel timer for setting SLAVE role on device in case MASTER role was not propagated instead of ItemScheduler - Remove retries from SalRoleService See also: bug 8913 Change-Id: I808efef843ed936035fbf7759ae22d8976c80ca7 Signed-off-by: Tomas Slusny --- diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java index bbb4497ce4..ad0a8a1462 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java @@ -8,9 +8,7 @@ package org.opendaylight.openflowplugin.api.openflow.device; -import com.google.common.util.concurrent.ListenableFuture; import java.util.List; -import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; @@ -20,9 +18,6 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateL import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; -import org.opendaylight.yangtools.yang.common.RpcResult; /** * The central entity of OFP is the Device Context, which encapsulate the logical state of a switch @@ -107,18 +102,6 @@ public interface DeviceContext extends */ ItemLifeCycleRegistry getItemLifeCycleSourceRegistry(); - /** - * Setter for sal role service. - * @param salRoleService role service - */ - void setSalRoleService(@Nonnull SalRoleService salRoleService); - - /** - * Make device slave. - * @return listenable future from sal role service - */ - ListenableFuture> makeDeviceSlave(); - /** * Checks if device and controller supports single layer serialization. * @return true if single layer serialization is supported diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChain.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChain.java index 850447d0ef..928bb02275 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChain.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChain.java @@ -40,11 +40,6 @@ public interface ContextChain extends ClusterSingletonService, AutoCloseable { */ void registerServices(ClusterSingletonServiceProvider clusterSingletonServiceProvider); - /** - * After connect of device make this device SLAVE. - */ - void makeDeviceSlave(); - /** * Check all needed to be master. * @param mastershipState - state master on device, initial gather, initial submit, initial registry fill @@ -97,4 +92,4 @@ public interface ContextChain extends ClusterSingletonService, AutoCloseable { * @param deviceRemovedHandler device removed handler */ void registerDeviceRemovedHandler(@Nonnull DeviceRemovedHandler deviceRemovedHandler); -} +} \ No newline at end of file diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainMastershipWatcher.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainMastershipWatcher.java index 909a09f1fc..b4d5e51cf9 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainMastershipWatcher.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainMastershipWatcher.java @@ -51,6 +51,7 @@ public interface ContextChainMastershipWatcher { /** * Change to SLAVE role on device was not able. * @param deviceInfo connected switch identification + * @param reason reason */ - void onSlaveRoleNotAcquired(DeviceInfo deviceInfo); + void onSlaveRoleNotAcquired(DeviceInfo deviceInfo, String reason); } diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleContext.java new file mode 100644 index 0000000000..5978284562 --- /dev/null +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleContext.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.api.openflow.role; + +import org.opendaylight.openflowplugin.api.openflow.OFPContext; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; + +/** + * Handles propagation of SLAVE and MASTER roles on connected devices. + */ +public interface RoleContext extends OFPContext, RequestContextStack { + /** + * Sets role service. + * + * @param salRoleService the sal role service + */ + void setRoleService(SalRoleService salRoleService); +} \ No newline at end of file diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleManager.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleManager.java new file mode 100644 index 0000000000..4bf1505d54 --- /dev/null +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleManager.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.api.openflow.role; + +import javax.annotation.Nonnull; +import org.opendaylight.openflowplugin.api.openflow.OFPManager; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; + +/** + * Manages creation and termination of role contexts. + * @see org.opendaylight.openflowplugin.api.openflow.role.RoleContext + */ +public interface RoleManager extends OFPManager { + /** + * Create role context. + * + * @param deviceContext the device context + * @return the role context + */ + RoleContext createContext(@Nonnull DeviceContext deviceContext); +} \ No newline at end of file diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java index b8edaff73d..1bb91254d8 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java @@ -42,6 +42,7 @@ import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationS import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager; +import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; @@ -57,6 +58,7 @@ import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitiali import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl; import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector; import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector; +import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl; import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl; @@ -102,6 +104,7 @@ public class OpenFlowPluginProviderImpl implements private DeviceManager deviceManager; private RpcManager rpcManager; private StatisticsManager statisticsManager; + private RoleManager roleManager; private ConnectionManager connectionManager; private ThreadPoolExecutor threadPool; private ContextChainHolderImpl contextChainHolder; @@ -226,19 +229,20 @@ public class OpenFlowPluginProviderImpl implements hashedWheelTimer, convertorManager); + roleManager = new RoleManagerImpl(hashedWheelTimer); + contextChainHolder = new ContextChainHolderImpl( - hashedWheelTimer, threadPool, singletonServicesProvider, entityOwnershipService, - mastershipChangeServiceManager - ); + mastershipChangeServiceManager); statisticsManager.setReconciliationFrameworkRegistrar(mastershipChangeServiceManager); contextChainHolder.addManager(deviceManager); contextChainHolder.addManager(statisticsManager); contextChainHolder.addManager(rpcManager); + contextChainHolder.addManager(roleManager); connectionManager = new ConnectionManagerImpl(config, threadPool); connectionManager.setDeviceConnectedHandler(contextChainHolder); @@ -265,6 +269,7 @@ public class OpenFlowPluginProviderImpl implements gracefulShutdown(deviceManager); gracefulShutdown(rpcManager); gracefulShutdown(statisticsManager); + gracefulShutdown(roleManager); gracefulShutdown(threadPool); gracefulShutdown(hashedWheelTimer); unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME); diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 7b4ba521f0..810673b138 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -10,10 +10,8 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; -import io.netty.util.TimerTask; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -106,16 +104,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.Pa import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,9 +123,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi // TODO: high water mark factor should be parametrized private static final float HIGH_WATERMARK_FACTOR = 0.95f; - // Timeout in seconds after what we will give up on propagating role - private static final int SET_ROLE_TIMEOUT = 10; - // Timeout in milliseconds after what we will give up on initializing device private static final int DEVICE_INIT_TIMEOUT = 9000; @@ -175,18 +165,17 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private SalRoleService salRoleService; private ContextChainMastershipWatcher contextChainMastershipWatcher; - DeviceContextImpl( - @Nonnull final ConnectionContext primaryConnectionContext, - @Nonnull final DataBroker dataBroker, - @Nonnull final MessageSpy messageSpy, - @Nonnull final TranslatorLibrary translatorLibrary, - final ConvertorExecutor convertorExecutor, - final boolean skipTableFeatures, - final HashedWheelTimer hashedWheelTimer, - final boolean useSingleLayerSerialization, - final DeviceInitializerProvider deviceInitializerProvider, - final boolean isFlowRemovedNotificationOn, - final boolean switchFeaturesMandatory) { + DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext, + @Nonnull final DataBroker dataBroker, + @Nonnull final MessageSpy messageSpy, + @Nonnull final TranslatorLibrary translatorLibrary, + final ConvertorExecutor convertorExecutor, + final boolean skipTableFeatures, + final HashedWheelTimer hashedWheelTimer, + final boolean useSingleLayerSerialization, + final DeviceInitializerProvider deviceInitializerProvider, + final boolean isFlowRemovedNotificationOn, + final boolean switchFeaturesMandatory) { this.primaryConnectionContext = primaryConnectionContext; this.deviceInfo = primaryConnectionContext.getDeviceInfo(); @@ -625,11 +614,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3; } - @Override - public void setSalRoleService(@Nonnull SalRoleService salRoleService) { - this.salRoleService = salRoleService; - } - @Override public void instantiateServiceInstance() { lazyTransactionManagerInitialization(); @@ -673,9 +657,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi deviceInfo.toString())); } - Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), - new RpcResultFutureCallback(contextChainMastershipWatcher)); - final ListenableFuture>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill(); Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher)); @@ -713,72 +694,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return abstractRequestContext; } - private ListenableFuture> sendRoleChangeToDevice(final OfpRole newRole) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId()); - } - - final Future> setRoleOutputFuture; - - if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) { - final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole) - .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build(); - - setRoleOutputFuture = this.salRoleService.setRole(setRoleInput); - - final TimerTask timerTask = timeout -> { - if (!setRoleOutputFuture.isDone()) { - LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT); - setRoleOutputFuture.cancel(true); - } - }; - - hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS); - } else { - LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion()); - return Futures.immediateFuture(null); - } - - return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture); - } - - @Override - public ListenableFuture> makeDeviceSlave() { - return sendRoleChangeToDevice(OfpRole.BECOMESLAVE); - } - @Override public void onStateAcquired(final ContextChainState state) { hasState.set(true); } - private class RpcResultFutureCallback implements FutureCallback> { - - private final ContextChainMastershipWatcher contextChainMastershipWatcher; - - RpcResultFutureCallback(final ContextChainMastershipWatcher contextChainMastershipWatcher) { - this.contextChainMastershipWatcher = contextChainMastershipWatcher; - } - - @Override - public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { - this.contextChainMastershipWatcher.onMasterRoleAcquired( - deviceInfo, - ContextChainMastershipState.MASTER_ON_DEVICE - ); - if (LOG.isDebugEnabled()) { - LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo); - } - } - - @Override - public void onFailure(final Throwable throwable) { - contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( - deviceInfo, - "Was not able to set MASTER role on device"); - } - } - private class DeviceFlowRegistryCallback implements FutureCallback>> { private final ListenableFuture>> deviceFlowRegistryFill; private final ContextChainMastershipWatcher contextChainMastershipWatcher; diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 414f127eb0..521565c465 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -38,7 +38,6 @@ import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionCon import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtil; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; @@ -168,7 +167,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi config.isEnableFlowRemovedNotification(), config.isSwitchFeaturesMandatory()); - deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext)); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); deviceContext.setNotificationPublishService(notificationPublishService); diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java index 49ac750480..7ed91e26dc 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java @@ -11,7 +11,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; -import io.netty.util.HashedWheelTimer; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,12 +40,13 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.MasterChecker; import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener; +import org.opendaylight.openflowplugin.api.openflow.role.RoleContext; +import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil; -import org.opendaylight.openflowplugin.impl.util.ItemScheduler; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; @@ -60,23 +60,20 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class); private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}"; - private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L; - private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2; private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L; private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType"; private final Map contextChainMap = new ConcurrentHashMap<>(); private final EntityOwnershipListenerRegistration eosListenerRegistration; private final ClusterSingletonServiceProvider singletonServiceProvider; - private final ItemScheduler scheduler; private final ExecutorService executorService; private final OwnershipChangeListener ownershipChangeListener; private DeviceManager deviceManager; private RpcManager rpcManager; private StatisticsManager statisticsManager; + private RoleManager roleManager; - public ContextChainHolderImpl(final HashedWheelTimer timer, - final ExecutorService executorService, + public ContextChainHolderImpl(final ExecutorService executorService, final ClusterSingletonServiceProvider singletonServiceProvider, final EntityOwnershipService entityOwnershipService, final OwnershipChangeListener ownershipChangeListener) { @@ -86,12 +83,6 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker this.ownershipChangeListener.setMasterChecker(this); this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService .registerListener(ASYNC_SERVICE_ENTITY_TYPE, this)); - - this.scheduler = new ItemScheduler<>( - timer, - CHECK_ROLE_MASTER_TIMEOUT, - CHECK_ROLE_MASTER_TOLERANCE, - ContextChain::makeDeviceSlave); } @Override @@ -105,6 +96,9 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) { LOG.trace("Context chain holder: Statistics manager OK."); statisticsManager = (StatisticsManager) manager; + } else if (Objects.isNull(roleManager) && manager instanceof RoleManager) { + LOG.trace("Context chain holder: Role manager OK."); + roleManager = (RoleManager) manager; } } @@ -124,25 +118,25 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker statisticsContext.registerMastershipWatcher(this); LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); + final RoleContext roleContext = roleManager.createContext(deviceContext); + roleContext.registerMastershipWatcher(this); + LOG.debug("Role" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); + final ContextChain contextChain = new ContextChainImpl(this, connectionContext, executorService); contextChain.registerDeviceRemovedHandler(deviceManager); contextChain.registerDeviceRemovedHandler(rpcManager); contextChain.registerDeviceRemovedHandler(statisticsManager); + contextChain.registerDeviceRemovedHandler(roleManager); contextChain.registerDeviceRemovedHandler(this); contextChain.addContext(deviceContext); contextChain.addContext(rpcContext); contextChain.addContext(statisticsContext); + contextChain.addContext(roleContext); contextChainMap.put(deviceInfo, contextChain); LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); deviceContext.onPublished(); - scheduler.add(deviceInfo, contextChain); - scheduler.startIfNotRunning(); - LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.", - deviceInfo, - CHECK_ROLE_MASTER_TIMEOUT / 1000L); - contextChain.registerServices(singletonServiceProvider); } @@ -192,7 +186,6 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker @Override public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) { - scheduler.remove(deviceInfo); Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { if (ownershipChangeListener.isReconciliationFrameworkRegistered()) { if (mastershipState == ContextChainMastershipState.INITIAL_SUBMIT) { @@ -216,13 +209,14 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker @Override public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) { - scheduler.remove(deviceInfo); ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo); + LOG.info("Role SLAVE was granted to device {}", deviceInfo); Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave); } @Override - public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) { + public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo, final String reason) { + LOG.warn("Not able to set SLAVE role on device {}, reason: {}", deviceInfo, reason); Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo)); } @@ -244,12 +238,14 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker @VisibleForTesting boolean checkAllManagers() { - return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager); + return Objects.nonNull(deviceManager) + && Objects.nonNull(rpcManager) + && Objects.nonNull(statisticsManager) + && Objects.nonNull(roleManager); } @Override public void close() throws Exception { - scheduler.close(); Map copyOfChains = new HashMap<>(contextChainMap); copyOfChains.keySet().forEach(this::destroyContextChain); copyOfChains.clear(); @@ -287,8 +283,7 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker } private void destroyContextChain(final DeviceInfo deviceInfo) { - scheduler.remove(deviceInfo); - + ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo); Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier()); contextChain.close(); @@ -335,7 +330,6 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker @Override public void onDeviceRemoved(final DeviceInfo deviceInfo) { - scheduler.remove(deviceInfo); contextChainMap.remove(deviceInfo); LOG.debug("Context chain removed for node {}", deviceInfo); } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java index 094338dd56..2c13345b7d 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java @@ -8,7 +8,6 @@ package org.opendaylight.openflowplugin.impl.lifecycle; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.List; @@ -19,12 +18,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain; @@ -34,8 +31,6 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener; import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +111,7 @@ public class ContextChainImpl implements ContextChain { } contextChainState.set(ContextChainState.CLOSED); - contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo); + unMasterMe(); // Close all connections to devices auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false)); @@ -159,18 +154,6 @@ public class ContextChainImpl implements ContextChain { LOG.info("Registered clustering services for node {}", deviceInfo); } - @Override - public void makeDeviceSlave() { - unMasterMe(); - - contexts.forEach(context -> { - if (context.map(DeviceContext.class::isInstance)) { - Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(), - new DeviceSlaveCallback(), executorService); - } - }); - } - @Override public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState) { switch (mastershipState) { @@ -279,16 +262,4 @@ public class ContextChainImpl implements ContextChain { masterStateOnDevice.set(false); rpcRegistration.set(false); } - - private final class DeviceSlaveCallback implements FutureCallback> { - @Override - public void onSuccess(@Nullable final RpcResult result) { - contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo); - } - - @Override - public void onFailure(@Nonnull final Throwable t) { - contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo); - } - } -} +} \ No newline at end of file diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java new file mode 100644 index 0000000000..17ea56fe06 --- /dev/null +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.impl.role; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; +import org.opendaylight.openflowplugin.api.openflow.role.RoleContext; +import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; +import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RoleContextImpl implements RoleContext { + private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class); + + // Timeout after what we will give up on propagating role + private static final long SET_ROLE_TIMEOUT = 10000; + + private final DeviceInfo deviceInfo; + private final HashedWheelTimer timer; + private final AtomicReference>> lastRoleFuture = new AtomicReference<>(); + private final Collection> requestContexts = new HashSet<>(); + private final Timeout slaveTask; + private ContextChainMastershipWatcher contextChainMastershipWatcher; + private SalRoleService roleService; + + RoleContextImpl(@Nonnull final DeviceInfo deviceInfo, + @Nonnull final HashedWheelTimer timer, + final long checkRoleMasterTimeout) { + this.deviceInfo = deviceInfo; + this.timer = timer; + slaveTask = timer.newTimeout((t) -> makeDeviceSlave(), checkRoleMasterTimeout, TimeUnit.MILLISECONDS); + + LOG.info("Started timer for setting SLAVE role on device {} if no role will be set in {}s.", + deviceInfo, + checkRoleMasterTimeout / 1000L); + } + + @Override + public DeviceInfo getDeviceInfo() { + return deviceInfo; + } + + @Override + public void setRoleService(final SalRoleService salRoleService) { + roleService = salRoleService; + } + + @Override + public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) { + this.contextChainMastershipWatcher = contextChainMastershipWatcher; + } + + @Override + public void close() { + slaveTask.cancel(); + changeLastRoleFuture(null); + requestContexts.forEach(requestContext -> RequestContextUtil + .closeRequestContextWithRpcError(requestContext, "Connection closed.")); + requestContexts.clear(); + } + + @Override + public void instantiateServiceInstance() { + final ListenableFuture> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER); + changeLastRoleFuture(future); + Futures.addCallback(future, new MasterRoleCallback(), MoreExecutors.directExecutor()); + } + + @Override + public ListenableFuture closeServiceInstance() { + changeLastRoleFuture(null); + return Futures.immediateFuture(null); + } + + @Override + public RequestContext createRequestContext() { + final AbstractRequestContext ret = new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { + @Override + public void close() { + requestContexts.remove(this); + } + }; + + requestContexts.add(ret); + return ret; + } + + @Nonnull + @Override + public ServiceGroupIdentifier getIdentifier() { + return deviceInfo.getServiceIdentifier(); + } + + private void changeLastRoleFuture(final ListenableFuture> newFuture) { + lastRoleFuture.getAndUpdate(lastFuture -> { + if (Objects.nonNull(lastFuture) && !lastFuture.isCancelled() && !lastFuture.isDone()) { + lastFuture.cancel(true); + } + + return newFuture; + }); + } + + private ListenableFuture> makeDeviceSlave() { + final ListenableFuture> future = sendRoleChangeToDevice(OfpRole.BECOMESLAVE); + changeLastRoleFuture(future); + Futures.addCallback(future, new SlaveRoleCallback(), MoreExecutors.directExecutor()); + return future; + } + + private ListenableFuture> sendRoleChangeToDevice(final OfpRole newRole) { + LOG.debug("Sending new role {} to device {}", newRole, deviceInfo); + + if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) { + final SetRoleInput setRoleInput = new SetRoleInputBuilder() + .setControllerRole(newRole) + .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())) + .build(); + + final Future> setRoleOutputFuture = roleService.setRole(setRoleInput); + + final TimerTask timerTask = timeout -> { + if (!setRoleOutputFuture.isDone()) { + LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, + deviceInfo, SET_ROLE_TIMEOUT); + setRoleOutputFuture.cancel(true); + } + }; + + timer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.MILLISECONDS); + return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture); + } + + LOG.info("Device: {} with version: {} does not support role {}", deviceInfo, deviceInfo.getVersion(), newRole); + return Futures.immediateFuture(null); + } + + private final class MasterRoleCallback implements FutureCallback> { + @Override + public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { + slaveTask.cancel(); + contextChainMastershipWatcher.onMasterRoleAcquired( + deviceInfo, + ContextChainMastershipState.MASTER_ON_DEVICE); + LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo); + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + slaveTask.cancel(); + contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( + deviceInfo, + "Was not able to propagate MASTER role on device. Error: " + throwable.toString()); + } + } + + private final class SlaveRoleCallback implements FutureCallback> { + @Override + public void onSuccess(@Nullable final RpcResult result) { + slaveTask.cancel(); + contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo); + LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo); + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + slaveTask.cancel(); + contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo, + "Was not able to propagate SLAVE role on device. Error: " + throwable.toString()); + } + } +} \ No newline at end of file diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java new file mode 100644 index 0000000000..442fd20a45 --- /dev/null +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.impl.role; + +import io.netty.util.HashedWheelTimer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.annotation.Nonnull; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; +import org.opendaylight.openflowplugin.api.openflow.role.RoleContext; +import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; +import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; + +public class RoleManagerImpl implements RoleManager { + // Timeout after what we will give up on waiting for master role + private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000; + + private final ConcurrentMap contexts = new ConcurrentHashMap<>(); + private final HashedWheelTimer timer; + + public RoleManagerImpl(final HashedWheelTimer timer) { + this.timer = timer; + } + + @Override + public RoleContext createContext(@Nonnull final DeviceContext deviceContext) { + final DeviceInfo deviceInfo = deviceContext.getDeviceInfo(); + final RoleContextImpl roleContext = new RoleContextImpl( + deviceContext.getDeviceInfo(), + timer, CHECK_ROLE_MASTER_TIMEOUT); + + roleContext.setRoleService(new SalRoleServiceImpl(roleContext, deviceContext)); + contexts.put(deviceInfo, roleContext); + return roleContext; + } + + @Override + public void onDeviceRemoved(final DeviceInfo deviceInfo) { + contexts.remove(deviceInfo); + } + + @Override + public void close() { + contexts.values().forEach(OFPContext::close); + contexts.clear(); + } +} \ No newline at end of file diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImpl.java index f8e42d2402..8770d19c6e 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImpl.java @@ -8,46 +8,33 @@ package org.opendaylight.openflowplugin.impl.services.sal; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import java.math.BigInteger; import java.util.concurrent.Future; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack; import org.opendaylight.openflowplugin.api.openflow.device.Xid; -import org.opendaylight.openflowplugin.impl.role.RoleChangeException; import org.opendaylight.openflowplugin.impl.services.AbstractSimpleService; import org.opendaylight.openflowplugin.impl.services.RoleService; import org.opendaylight.openflowplugin.impl.services.util.ServiceException; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class SalRoleServiceImpl extends AbstractSimpleService implements SalRoleService { - private static final Logger LOG = LoggerFactory.getLogger(SalRoleServiceImpl.class); - private static final BigInteger MAX_GENERATION_ID = new BigInteger("ffffffffffffffff", 16); - private static final int MAX_RETRIES = 42; - - private static final String ROLE_REQUEST_UNSUPPORTED = ErrorType.ROLEREQUESTFAILED.name().concat(" code UNSUP"); - private final DeviceContext deviceContext; private final RoleService roleService; @@ -66,93 +53,26 @@ public final class SalRoleServiceImpl extends AbstractSimpleService> setRole(final SetRoleInput input) { LOG.info("SetRole called with input:{}", input); - final SettableFuture> resultFuture = SettableFuture.create(); - repeaterForChangeRole(resultFuture, input, 0); - /* Add Callback for release Guard */ - Futures.addCallback(resultFuture, new FutureCallback>() { - - @Override - public void onSuccess(final RpcResult result) { - LOG.debug("SetRoleService for Node: {} is ok Role: {}", input.getNode().getValue(), - input.getControllerRole()); - } - - @Override - public void onFailure(final Throwable t) { - LOG.error("SetRoleService set Role {} for Node: {} fail . Reason {}", input.getControllerRole(), - input.getNode().getValue(), t); - } - }); - return resultFuture; - } - - private void repeaterForChangeRole(final SettableFuture> future, final SetRoleInput input, - final int retryCounter) { - if (future.isCancelled()) { - future.setException(new RoleChangeException(String.format( - "Set Role for device %s stop because Future was canceled", input.getNode().getValue()))); - return; - } - if (retryCounter >= MAX_RETRIES) { - future.setException(new RoleChangeException(String.format("Set Role failed after %s tries on device %s", - MAX_RETRIES, input.getNode().getValue()))); - return; - } // Check current connection state final CONNECTION_STATE state = deviceContext.getPrimaryConnectionContext().getConnectionState(); switch (state) { - case RIP: - LOG.info("Device {} has been disconnected", input.getNode()); - future.setException(new Exception(String.format( - "Device connection doesn't exist anymore. Primary connection status : %s", state))); - return; - case WORKING: - // We can proceed - LOG.trace("Device {} has been working", input.getNode()); - break; - default: - LOG.warn("Device {} is in state {}, role change is not allowed", input.getNode(), state); - future.setException(new Exception(String.format("Unexcpected device connection status : %s", state))); - return; + case RIP: + LOG.info("Device {} has been disconnected", input.getNode()); + return Futures.immediateFailedFuture(new Exception(String + .format("Device connection doesn't exist anymore. Primary connection status : %s", + state))); + case WORKING: + // We can proceed + LOG.trace("Device {} has been working", input.getNode()); + break; + default: + LOG.warn("Device {} is in state {}, role change is not allowed", input.getNode(), state); + return Futures.immediateFailedFuture(new Exception(String + .format("Unexpected device connection status : %s", state))); } LOG.info("Requesting state change to {}", input.getControllerRole()); - final ListenableFuture> changeRoleFuture = tryToChangeRole(input.getControllerRole()); - Futures.addCallback(changeRoleFuture, new FutureCallback>() { - - @Override - public void onSuccess(final RpcResult result) { - if (result.isSuccessful()) { - LOG.debug("setRoleOutput received after roleChangeTask execution:{}", result); - future.set(RpcResultBuilder. success().withResult(result.getResult()).build()); - } else { - final boolean present = result - .getErrors() - .stream() - .anyMatch(rpcError -> (rpcError.getMessage().contains(ROLE_REQUEST_UNSUPPORTED))); - - if (!present) { - LOG.warn("setRole() failed with errors, will retry: {} times.", MAX_RETRIES - retryCounter); - repeaterForChangeRole(future, input, (retryCounter + 1)); - } else { - LOG.warn("setRole() failed with error - role request unsupported."); - future.set(result); - } - } - } - - @Override - public void onFailure(final Throwable t) { - if (!t.getMessage().contains(ROLE_REQUEST_UNSUPPORTED)) { - LOG.warn("Exception in setRole(), will retry: {} times.", t, MAX_RETRIES - retryCounter); - repeaterForChangeRole(future, input, (retryCounter + 1)); - } else { - LOG.warn("Exception in setRole() - role request unsupported.", t); - future.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, t.getMessage()).build()); - } - } - }); + return tryToChangeRole(input.getControllerRole()); } private ListenableFuture> tryToChangeRole(final OfpRole role) { @@ -160,7 +80,7 @@ public final class SalRoleServiceImpl extends AbstractSimpleService generationFuture = roleService.getGenerationIdFromDevice(getVersion()); - return Futures.transformAsync(JdkFutureAdapters.listenInPoolThread(generationFuture), (AsyncFunction>) generationId -> { + return Futures.transformAsync(JdkFutureAdapters.listenInPoolThread(generationFuture), generationId -> { LOG.debug("RoleChangeTask, GenerationIdFromDevice from device {} is {}", getDeviceInfo().getNodeId().getValue(), generationId); final BigInteger nextGenerationId = getNextGenerationId(generationId); LOG.debug("nextGenerationId received from device:{} is {}", getDeviceInfo().getNodeId().getValue(), nextGenerationId); @@ -176,4 +96,4 @@ public final class SalRoleServiceImpl extends AbstractSimpleService implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ItemScheduler.class); - - private final HashedWheelTimer hashedWheelTimer; - private final Consumer action; - private final long timeoutMillis; - private final long toleranceMillis; - private final Map items = Collections.synchronizedMap(new HashMap<>()); - private final Map queue = Collections.synchronizedMap(new HashMap<>()); - private final Object scheduleLock = new Object(); - - private volatile long startTime = -1; - private volatile Timeout runningTimeout; - - /** - * Instantiates a new Item scheduler. - * - * @param hashedWheelTimer the hashed wheel timer - * @param timeoutMillis the timeout millis - * @param toleranceMillis the tolerance millis - * @param action the action - */ - public ItemScheduler(final HashedWheelTimer hashedWheelTimer, - final long timeoutMillis, - final long toleranceMillis, - final Consumer action) { - this.hashedWheelTimer = hashedWheelTimer; - this.action = action; - this.timeoutMillis = timeoutMillis; - this.toleranceMillis = toleranceMillis; - } - - /** - * Start scheduler timeout if it is not already running and if there are any items scheduled - */ - public void startIfNotRunning() { - synchronized (scheduleLock) { - if (Objects.nonNull(runningTimeout) || (items.isEmpty() && queue.isEmpty())) { - LOG.debug("Scheduler {} is already running or nothing is scheduled, skipping start.", this); - return; - } - - startTime = System.currentTimeMillis(); - LOG.debug("Scheduler {} started with configured timeout {}ms and scheduling tolerance {}ms.", - this, timeoutMillis, toleranceMillis); - - runningTimeout = hashedWheelTimer.newTimeout((timeout) -> { - synchronized (scheduleLock) { - LOG.debug("Running configured action on {} scheduled items for scheduler {}. There are {} items left in queue.", - items.size(), this, queue.size()); - items.forEach((key, item) -> action.accept(item)); - items.clear(); - items.putAll(queue); - queue.clear(); - close(); - } - - startIfNotRunning(); - }, timeoutMillis, TimeUnit.MILLISECONDS); - } - } - - /** - * Schedule item for processing - * - * @param key the item key - * @param item the item - */ - public void add(final K key, final V item) { - synchronized (scheduleLock) { - final long currentTime = System.currentTimeMillis(); - - if (currentTime - toleranceMillis <= startTime) { - LOG.debug("Adding {} to scheduled items for scheduler {}.", key, this); - items.put(key, item); - } else { - LOG.debug("Adding {} to scheduling queue for scheduler {}.", key, this); - queue.put(key, item); - } - } - } - - /** - * Remove item for processing - * @param key the item key - */ - public void remove(final K key) { - synchronized (scheduleLock) { - LOG.debug("Removing {} from scheduled items and queue for scheduler {}", key, this); - items.remove(key); - queue.remove(key); - - if (items.isEmpty() && queue.isEmpty()) { - close(); - } - } - } - - @Override - public void close() { - LOG.debug("Closing scheduler {} and cancelling all running tasks.", this); - startTime = -1; - - if (Objects.nonNull(runningTimeout)) { - runningTimeout.cancel(); - runningTimeout = null; - } - } -} diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java index 9cccb5e6e8..8f7b7c976f 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java @@ -241,7 +241,6 @@ public class DeviceContextImplTest { deviceInitializerProvider, true, false); - deviceContext.setSalRoleService(salRoleService); ((DeviceContextImpl) deviceContext).lazyTransactionManagerInitialization(); deviceContextSpy = Mockito.spy(deviceContext); diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImplTest.java index 8c6b03bef0..667b4a700e 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImplTest.java @@ -8,7 +8,6 @@ package org.opendaylight.openflowplugin.impl.lifecycle; import com.google.common.util.concurrent.Futures; -import io.netty.util.HashedWheelTimer; import java.util.concurrent.ExecutorService; import org.junit.Assert; import org.junit.Before; @@ -30,10 +29,11 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener; import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager; import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkEvent; import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkRegistration; +import org.opendaylight.openflowplugin.api.openflow.role.RoleContext; +import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; @@ -49,20 +49,22 @@ public class ContextChainHolderImplTest { private static final String ENTITY_TEST = "EntityTest"; private static final String OPENFLOW_TEST = "openflow:test"; @Mock - private HashedWheelTimer timer; - @Mock private StatisticsManager statisticsManager; @Mock private RpcManager rpcManager; @Mock private DeviceManager deviceManager; @Mock + private RoleManager roleManager; + @Mock private StatisticsContext statisticsContext; @Mock private RpcContext rpcContext; @Mock private DeviceContext deviceContext; @Mock + private RoleContext roleContext; + @Mock private ConnectionContext connectionContext; @Mock private DeviceInfo deviceInfo; @@ -77,8 +79,6 @@ public class ContextChainHolderImplTest { @Mock private EntityOwnershipListenerRegistration entityOwnershipListenerRegistration; @Mock - private OwnershipChangeListener ownershipChangeListener; - @Mock private ReconciliationFrameworkEvent reconciliationFrameworkEvent; @Mock private FeaturesReply featuresReply; @@ -100,7 +100,7 @@ public class ContextChainHolderImplTest { Mockito.when(deviceManager.createContext(connectionContext)).thenReturn(deviceContext); Mockito.when(rpcManager.createContext(deviceContext)).thenReturn(rpcContext); Mockito.when(statisticsManager.createContext(deviceContext)).thenReturn(statisticsContext); - Mockito.when(deviceContext.makeDeviceSlave()).thenReturn(Futures.immediateFuture(null)); + Mockito.when(roleManager.createContext(deviceContext)).thenReturn(roleContext); Mockito.when(deviceContext.getDeviceInfo()).thenReturn(deviceInfo); Mockito.when(singletonServicesProvider.registerClusterSingletonService(Mockito.any())) @@ -113,15 +113,14 @@ public class ContextChainHolderImplTest { registration = manager.reconciliationFrameworkRegistration(reconciliationFrameworkEvent); contextChainHolder = new ContextChainHolderImpl( - timer, executorService, singletonServicesProvider, entityOwnershipService, - manager - ); + manager); contextChainHolder.addManager(statisticsManager); contextChainHolder.addManager(rpcManager); contextChainHolder.addManager(deviceManager); + contextChainHolder.addManager(roleManager); } @Test @@ -135,6 +134,7 @@ public class ContextChainHolderImplTest { Mockito.verify(deviceManager).createContext(Mockito.any(ConnectionContext.class)); Mockito.verify(rpcManager).createContext(Mockito.any(DeviceContext.class)); Mockito.verify(statisticsManager).createContext(Mockito.any(DeviceContext.class)); + Mockito.verify(roleManager).createContext(Mockito.any(DeviceContext.class)); } @@ -231,7 +231,7 @@ public class ContextChainHolderImplTest { public void notAbleToSetSlave() throws Exception { registration.close(); contextChainHolder.deviceConnected(connectionContext); - contextChainHolder.onSlaveRoleNotAcquired(deviceInfo); + contextChainHolder.onSlaveRoleNotAcquired(deviceInfo, "Test reason"); Mockito.verify(deviceContext).close(); Mockito.verify(statisticsContext).close(); Mockito.verify(rpcContext).close(); @@ -294,4 +294,4 @@ public class ContextChainHolderImplTest { contextChainHolder.ownershipChanged(ownershipChange); Mockito.verify(deviceManager,Mockito.never()).removeDeviceFromOperationalDS(Mockito.any()); } -} \ No newline at end of file +} diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImplTest.java index c8ceea35fa..c61a602937 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImplTest.java @@ -27,7 +27,6 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; -import org.opendaylight.openflowplugin.impl.role.RoleChangeException; @RunWith(MockitoJUnitRunner.class) public class ContextChainImplTest { @@ -118,21 +117,6 @@ public class ContextChainImplTest { Assert.assertEquals(contextChain.getIdentifier(), SERVICE_GROUP_IDENTIFIER); } - @Test - public void makeDeviceSlave() throws Exception { - Mockito.when(deviceContext.makeDeviceSlave()).thenReturn(Futures.immediateFuture(null)); - contextChain.makeDeviceSlave(); - Mockito.verify(contextChainMastershipWatcher).onSlaveRoleAcquired(Mockito.any(DeviceInfo.class)); - } - - @Test - public void makeDeviceSlaveFailure() throws Exception { - Mockito.when(deviceContext.makeDeviceSlave()) - .thenReturn(Futures.immediateFailedFuture(new RoleChangeException(TEST_NODE))); - contextChain.makeDeviceSlave(); - Mockito.verify(contextChainMastershipWatcher).onSlaveRoleNotAcquired(Mockito.any(DeviceInfo.class)); - } - @Test public void instantiateServiceInstanceFail() throws Exception { Mockito.doThrow(new IllegalStateException()).when(deviceContext).instantiateServiceInstance(); diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleContextImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleContextImplTest.java new file mode 100644 index 0000000000..05860018b0 --- /dev/null +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleContextImplTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.impl.role; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.Futures; +import io.netty.util.HashedWheelTimer; +import java.util.concurrent.Future; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; +import org.opendaylight.openflowplugin.api.openflow.role.RoleContext; +import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; +import org.opendaylight.yangtools.yang.common.RpcResult; + +@RunWith(MockitoJUnitRunner.class) +public class RoleContextImplTest { + @Mock + private SalRoleService roleService; + @Mock + private ContextChainMastershipWatcher contextChainMastershipWatcher; + @Mock + private DeviceInfo deviceInfo; + @Mock + private DeviceContext deviceContext; + @Mock + private Future> setRoleFuture; + private RoleContext roleContext; + + @Before + public void setUp() throws Exception { + when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(DeviceStateUtil + .createNodeInstanceIdentifier(new NodeId("openflow:1"))); + when(deviceInfo.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3); + when(roleService.setRole(any())).thenReturn(Futures.immediateFuture(null)); + + roleContext = new RoleContextImpl(deviceInfo, new HashedWheelTimer(), 20000); + roleContext.registerMastershipWatcher(contextChainMastershipWatcher); + roleContext.setRoleService(roleService); + } + + @After + public void tearDown() throws Exception { + roleContext.close(); + } + + @Test + public void instantiateServiceInstance() throws Exception { + roleContext.instantiateServiceInstance(); + verify(roleService).setRole(new SetRoleInputBuilder() + .setControllerRole(OfpRole.BECOMEMASTER) + .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())) + .build()); + verify(contextChainMastershipWatcher).onMasterRoleAcquired( + deviceInfo, + ContextChainMastershipState.MASTER_ON_DEVICE); + } + + @Test + public void closeServiceInstance() throws Exception { + when(setRoleFuture.isCancelled()).thenReturn(false); + when(setRoleFuture.isDone()).thenReturn(false); + when(roleService.setRole(any())).thenReturn(setRoleFuture); + roleContext.instantiateServiceInstance(); + roleContext.closeServiceInstance().get(); + verify(setRoleFuture).cancel(true); + } + +} \ No newline at end of file diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImplTest.java index 20f2635000..5a2a174e67 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/sal/SalRoleServiceImplTest.java @@ -8,7 +8,6 @@ package org.opendaylight.openflowplugin.impl.services.sal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -47,7 +46,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetR import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -157,31 +155,6 @@ public class SalRoleServiceImplTest { } - @Test - public void testSetRoleUnsupported() throws Exception { - ListenableFuture> futureOutput = - RpcResultBuilder.failed() - .withError(ErrorType.APPLICATION, ROLES_UNSUPPORTED) - .buildFuture(); - - Mockito.when(mockRequestContext.getFuture()).thenReturn(futureOutput); - - SalRoleService salRoleService = new SalRoleServiceImpl(mockRequestContextStack, mockDeviceContext); - - SetRoleInput setRoleInput = new SetRoleInputBuilder() - .setControllerRole(OfpRole.BECOMESLAVE) - .setNode(nodeRef) - .build(); - - Future> future = salRoleService.setRole(setRoleInput); - - RpcResult roleOutputRpcResult = future.get(5, TimeUnit.SECONDS); - assertNotNull("RpcResult from future cannot be null.", roleOutputRpcResult); - assertFalse("RpcResult from future is successful.", roleOutputRpcResult.isSuccessful()); - assertEquals(ROLES_UNSUPPORTED, roleOutputRpcResult - .getErrors().iterator().next().getMessage()); - } - @Test public void testDuplicateRoles() throws Exception { // set role to slave